Skip to content

Commit 695d2f5

Browse files
committed
Lab10: added multithreaded file processing.
1 parent e4f7ddf commit 695d2f5

File tree

2 files changed

+89
-2
lines changed

2 files changed

+89
-2
lines changed

docs/src/lecture_10/lab.md

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,93 @@ end
407407

408408
### Multithreaded file processing
409409

410+
```@raw html
411+
<div class="admonition is-category-exercise">
412+
<header class="admonition-header">Exercise</header>
413+
<div class="admonition-body">
414+
```
415+
Write a multithreaded analog of the file processing pipeline from [exercise](@ref lab10_dist_file_p) above. We have already implemented most of the code that you will need (available as source code [here](source code)). **TODO**
416+
417+
Your job is to write the `map` and `reduce` steps, that will gather the dictionaries from different workers. There are two ways to map
418+
- either over directories inside `.julia/packages/`
419+
- or over all files obtained by concatenation of `filter_jl` outputs (*NOTE* that this might not be possible if the listing itself is expensive - speed or memory requirements)
420+
Measure if the speed up scales linearly with the number of threads in each case. (*NOTE* that this )
421+
422+
423+
**HINTS**:
424+
- create a separate dictionary for each thread in order to avoid the need for atomic operations
425+
426+
427+
**BONUS**:
428+
In each of the cases count how many files/pkgs each thread processed. Would the dynamic scheduler help us in this situation?
429+
430+
```@raw html
431+
</div></div>
432+
<details class = "solution-body">
433+
<summary class = "solution-header">Solution:</summary><p>
434+
```
435+
436+
```julia
437+
using Base.Threads
438+
include("./pkg_processing.jl")
439+
440+
"""
441+
merge_with!(h1, h2)
442+
443+
Merges count dictionary `h2` into `h1` by adding the counts.
444+
"""
445+
function merge_with!(h1, h2)
446+
for s in keys(h2)
447+
get!(h1, s, 0)
448+
h1[s] += h2[s]
449+
end
450+
h1
451+
end
452+
```
453+
454+
Firstly the version with folder-wise parallelism.
455+
```julia
456+
function threaded_histogram_pkgwise(path)
457+
ht = [Dict{Symbol, Int}() for _ in 1:nthreads()]
458+
@threads for pkg_dir in sample_all_installed_pkgs(path)
459+
h = ht[threadid()]
460+
for jl_path in filter_jl(pkg_dir)
461+
syms = tokenize(jl_path)
462+
for s in syms
463+
v = get!(h, s, 0)
464+
h[s] += 1
465+
end
466+
end
467+
end
468+
reduce(merge_with!, ht)
469+
end
470+
path = joinpath(DEPOT_PATH[1], "packages")
471+
@time h = threaded_histogram_pkgwise(path)
472+
```
473+
474+
Secondly the version with file-wise parallelism.
475+
```julia
476+
function threaded_histogram_filewise(path)
477+
jl_files = reduce(vcat, filter_jl(pkg_dir) for pkg_dir in sample_all_installed_pkgs(path))
478+
ht = [Dict{Symbol, Int}() for _ in 1:nthreads()]
479+
@threads for jl_path in jl_files
480+
h = ht[threadid()]
481+
syms = tokenize(jl_path)
482+
for s in syms
483+
v = get!(h, s, 0)
484+
h[s] += 1
485+
end
486+
end
487+
reduce(merge_with!, ht)
488+
end
489+
path = joinpath(DEPOT_PATH[1], "packages")
490+
@time h = threaded_histogram_filewise(path)
491+
```
492+
493+
```@raw html
494+
</p></details>
495+
```
496+
410497
## Task switching
411498
There is a way how to run "multiple" things at once, which does not necessarily involve either threads or processes. In Julia this concept is called task switching or asynchronous programming, where we fire off our requests in a short time and let the cpu/os/network handle the distribution. As an example which we will try today is querrying a web API, which has some variable latency. In the usuall sequantial fashion we can always post querries one at a time, however generally the APIs can handle multiple request at a time, therefore in order to better utilize them, we can call them asynchronously and fetch all results later, in some cases this will be faster.
412499

docs/src/lecture_10/pkg_processing.jl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ Returns root folders of all installed packages in the system. Package version is
55
"""
66
function sample_all_installed_pkgs(path::AbstractString)
77
pkgs = readdir(path)
8-
# (rand(readdir(joinpath(path, p), join=true)) for p in pkgs) # sampling version
9-
(readdir(joinpath(path, p), join=true)[1] for p in pkgs) # deterministic version
8+
# [rand(readdir(joinpath(path, p), join=true)) for p in pkgs] # sampling version
9+
[readdir(joinpath(path, p), join=true)[1] for p in pkgs] # deterministic version
1010
end
1111

1212
"""

0 commit comments

Comments
 (0)