Skip to content

Commit a863787

Browse files
pevnakjanfrancu
authored andcommitted
description of channels and bit of their infrastructure
1 parent 93e5e29 commit a863787

File tree

1 file changed

+73
-21
lines changed

1 file changed

+73
-21
lines changed

docs/src/lecture_10/lecture.md

Lines changed: 73 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -189,21 +189,24 @@ end
189189
julia> @btime juliaset_channels(-0.79, 0.15);
190190
254.151 μs (254 allocations: 987.09 KiB)
191191
```
192-
The execution tim is much higher then the we have observed in the previous cases and changing the number of workers do not help much. What went wrong? The reason is that setting up the infrastructure around remote channels is a costly process. Consider the following alternative, where (i) we let workers to run end-lessly and (ii) the channel infrastructure is set-up once and wrapped into an anonymous function
192+
The execution timw is much higher then what we have observed in the previous cases and changing the number of workers does not help much. What went wrong? The reason is that setting up the infrastructure around remote channels is a costly process. Consider the following alternative, where (i) we let workers to run endlessly and (ii) the channel infrastructure is set-up once and wrapped into an anonymous function
193193
```julia
194194
@everywhere begin
195195
function juliaset_channel_worker(instructions, results)
196-
while true
197-
c, n, cols = take!(instructions)
198-
put!(results, (cols, juliaset_columns(c, n, cols)))
196+
while true
197+
c, n, cols = take!(instructions)
198+
put!(results, (cols, juliaset_columns(c, n, cols)))
199+
end
199200
end
200201
end
201202

202203
function juliaset_init(x, y, n = 1000, np = nworkers())
203204
c = x + y*im
204205
columns = Iterators.partition(1:n, div(n, np))
205-
instructions = RemoteChannel(() -> Channel{Tuple}(np))
206-
results = RemoteChannel(()->Channel{Tuple}(np))
206+
T = Tuple{ComplexF64,Int64,UnitRange{Int64}}
207+
instructions = RemoteChannel(() -> Channel{T}(np))
208+
T = Tuple{UnitRange{Int64},Array{UInt8,2}}
209+
results = RemoteChannel(()->Channel{T}(np))
207210
foreach(p -> remote_do(juliaset_channel_worker, p, instructions, results), workers())
208211
function compute()
209212
img = Array{UInt8,2}(undef, n, n)
@@ -220,29 +223,72 @@ t = juliaset_init(-0.79, 0.15)
220223
julia> @btime t();
221224
17.697 ms (776 allocations: 1.94 MiB)
222225
```
223-
with which we obtain the comparable speed.
224-
Instead of `@spawnat` we can also use `remote_do` as foreach`(p -> remote_do(juliaset_channel_worker, p, instructions, results), workers)`, which executes the function `juliaset_channel_worker` at worker `p` with parameters `instructions` and `results` but does not return handle to receive the future results.
225-
226-
- Channels and their guarantees
227-
- How to orchestrate workers by channels
228-
- how to kill the remote process with channel
229-
230-
I can send indices of columns that should be calculated on remote processes over the queue, from which they can pick it up and send it back over the channel.
231-
232-
226+
with which we obtain the comparable speed to the `pmap` approach.
227+
!!! info
228+
### `remote_do` vs `remote_call`
229+
Instead of `@spawnat` (`remote_call`) we can also use `remote_do` as foreach`(p -> remote_do(juliaset_channel_worker, p, instructions, results), workers)`, which executes the function `juliaset_channel_worker` at worker `p` with parameters `instructions` and `results` but does not return `Future` handle to receive the future results.
233230

231+
!!! info
232+
### `Channel` and `RemoteChannel`
233+
`AbstractChannel` has to implement the interface `put!`, `take!`, `fetch`, `isready` and `wait`, i.e. it should behave like a queue. `Channel` is an implementation if an `AbstractChannel` that facilitates a communication within a single process (for the purpose of multi-threadding and task switching). Channel can be easily created by `Channel{T}(capacity)`, which can be infinite. The storage of a channel can be seen in `data` field, but a direct access will of course break all guarantees like atomicity of `take!` and `put!`. For communication between proccesses, the `<:AbstractChannel` has to be wrapped in `RemoteChannel`. The constructor for `RemoteChannel(f::Function, pid::Integer=myid())` has a first argument a function (without arguments) which constructs the `Channel` (or something like that) on the remote machine identified by `pid` and returns the `RemoteChannel`. The storage thus resides on the machine specified by `pid` and the handle provided by the `RemoteChannel` can be freely passed to any process. (For curious, `ProcessGroup` `Distributed.PGRP` contains an information about channels on machines.)
234234

235-
## tooling
236-
- how to set up workers,
237-
+ how to load functions, modules
238-
+ julia -p 16 -L load_my_script.jl
239-
- how to send data / how to define variable on remote process
235+
In the above example, `juliaset_channel_worker` defined as
236+
```julia
237+
function juliaset_channel_worker(instructions, results)
238+
while true
239+
c, n, cols = take!(instructions)
240+
put!(results, (cols, juliaset_columns(c, n, cols)))
241+
end
242+
end
243+
```
244+
runs forever due to the `while true` loop. To stop the computation, we usually extend the type accepted by the `instructions` channel to accept some stopping token (e.g. :stop) and stop.
245+
```julia
246+
function juliaset_channel_worker(instructions, results)
247+
while true
248+
i = take!(instructions)
249+
i === :stop && break
250+
c, n, cols = i
251+
put!(results, (cols, juliaset_columns(c, n, cols)))
252+
end
253+
put!(results, :stop)
254+
end
255+
```
256+
Julia does not provide by default any facility to kill the remote execution except sending `ctrl-c` to the remote worker as `interrupt(pids::Integer...)`.
240257

241258
## Sending data
242259
- Do not send `randn(1000, 1000)`
260+
- Sending references and ObjectID would not work
243261
- Serialization is very time consuming, an efficient converstion to something simple might be wort
244262
- Dict("a" => [1,2,3], "b" = [2,3,4,5]) -> (Array of elements, array of bounds, keys)
245263

264+
## Practical advices
265+
Recall that (i) workers are started as clean processes and (ii) they might not share the same environment with the main process. The latter is due to the fact that files describing the environment (`Project.toml` and `Manifest.toml`) might not be available on remote machines.
266+
We recommend:
267+
- to have shared directory (shared home) with code and to share the location of packages
268+
- to place all code for workers to one file, let's call it `worker.jl` (author of this includes the code for master as well).
269+
- put to the beggining of `worker.jl` code activating specified environment as
270+
```julia
271+
using Pkg
272+
Pkg.activate(@__DIR__)
273+
```
274+
and optionally
275+
```julia
276+
Pkg.resolve()
277+
Pkg.instantiate()
278+
```
279+
- run julia as
280+
```julia
281+
julia -p ?? -L worker.jl main.jl
282+
```
283+
where `main.jl` is the script to be executed on the main node. Or
284+
```julia
285+
julia -p ?? -L worker.jl -e "main()"
286+
```
287+
where `main()` is the function defined in `worker.jl` to be executed on the main node.
288+
289+
A complete example can be seen in [`juliaset_p.jl`](juliaset_p.jl).
290+
291+
246292
## Multi-Threadding
247293
- Locks / lock-free multi-threadding
248294
- Show the effect of different schedullers
@@ -392,6 +438,12 @@ julia> @btime juliaset(-0.79, 0.15, 1000, juliaset_folds!);
392438
10.421 ms (3582 allocations: 1.20 MiB)
393439
```
394440

441+
## Take away message
442+
When deciding, what kind of paralelism to employ, consider following
443+
- for tightly coupled computation over shared data, multi-threadding is more suitable due to non-existing sharing of data between processes
444+
- but if the computation requires frequent allocation and freeing of memery, or IO, separate processes are multi-suitable, since garbage collectors are independent between processes
445+
- `Transducers` thrives for (almost) the same code to support thread- and process-based paralelism.
446+
395447
### Materials
396448
- http://cecileane.github.io/computingtools/pages/notes1209.html
397449
- https://lucris.lub.lu.se/ws/portalfiles/portal/61129522/julia_parallel.pdf

0 commit comments

Comments
 (0)