|
| 1 | +# Design Doc: Concurrent Programming with Fluid |
| 2 | + |
| 3 | +With PaddlePaddle Fluid, users describe a program other than a model. The program is a [`ProgramDesc`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/framework.proto) protobuf message. TensorFlow/MxNet/Caffe2 applications generate protobuf messages too, but their protobuf messages represent the model, a graph of operators, but not the program that trains/uses the model. |
| 4 | + |
| 5 | +Many know that when we program TensorFlow, we can specify the device on which each operator runs. This allows us to create a concurrent/parallel AI application. An interesting questions is **how does a `ProgramDesc` represents a concurrent program?** |
| 6 | + |
| 7 | +The answer relies on the fact that a `ProgramDesc` is similar to an abstract syntax tree (AST) that describes a program. So users just program a concurrent program that they do with any concurrent programming language, e.g., [Go](https://golang.org). |
| 8 | + |
| 9 | +## An Analogy |
| 10 | + |
| 11 | +The following table compares concepts in Fluid and Go |
| 12 | + |
| 13 | +| Go | Fluid | |
| 14 | +|----|-------| |
| 15 | +|user-defined functions | [layers](https://github.com/PaddlePaddle/Paddle/tree/develop/python/paddle/v2/fluid) | |
| 16 | +| control-flow and built-in functions | [intrinsics/operators](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/operators) | |
| 17 | +| goroutines, channels | [class ThreadPool](https://github.com/PaddlePaddle/Paddle/tree/develop/paddle/framework/thread_pool.h) | |
| 18 | +| runtime | [class Executor](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/executor.h) | |
| 19 | + |
| 20 | +## An Example Concurrent Program |
| 21 | + |
| 22 | +To review all above concepts in an example, let us take a simple program and writes its distributed version. |
| 23 | + |
| 24 | +Suppose that we want to parallelize a naive Fluid program (written in Go and calling Fluid's Go binding) that multiplies two tensors. |
| 25 | + |
| 26 | +```go |
| 27 | +import "fluid" |
| 28 | + |
| 29 | +func paddlepaddle() { |
| 30 | + X = fluid.read(...) |
| 31 | + W = fluid.Tensor(...) |
| 32 | + Y = fluid.mult(X, W) |
| 33 | +} |
| 34 | +``` |
| 35 | + |
| 36 | +Please be aware that the Fluid's Go binding provides the default `main` function, which calls the `paddlepaddle` function, which, in this case, is defined in above program and creates the following `ProgramDesc` message. |
| 37 | + |
| 38 | +```protobuf |
| 39 | +message ProgramDesc { |
| 40 | + block[0] = Block { |
| 41 | + vars = [X, W, Y], |
| 42 | + ops = [ |
| 43 | + read(output = X) |
| 44 | + assign(input = ..., output = W) |
| 45 | + mult(input = {X, W}, output = Y) |
| 46 | + ], |
| 47 | + } |
| 48 | +} |
| 49 | +``` |
| 50 | + |
| 51 | +Then, the default `main` function calls `fluid.run()`, which creates an instance of the [`class Executor`](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/executor.h) and calls `Executor.Run(block[0])`, where `block[0]` is the first and only block defined in above `ProgramDesc` message. |
| 52 | + |
| 53 | +The default `main` function is defined as follows: |
| 54 | + |
| 55 | +```go |
| 56 | +func main() { |
| 57 | + paddlepaddle() |
| 58 | + fluid.run() |
| 59 | +} |
| 60 | +``` |
| 61 | + |
| 62 | +## The Concurrent Version |
| 63 | + |
| 64 | +By parallelizing the above program, we could support very big tensor X by splitting into small pieces {x_1, x_2, ...} and sent each piece to worker process/node for parallel multiplication. |
| 65 | + |
| 66 | +In this case, we can write a transpiler that takes a `ProgramDesc` message that represents the above example program and outputs two `ProgramDesc` messages, one for running on the master process/node, and the other one for worker processes/nodes. |
| 67 | + |
| 68 | +### The Master Program |
| 69 | + |
| 70 | +The master program could look like the following: |
| 71 | + |
| 72 | +```protobuf |
| 73 | +message ProgramDesc { |
| 74 | + block[0] = Block { |
| 75 | + vars = [X, L, Y], |
| 76 | + ops = [ |
| 77 | + read(output = X) |
| 78 | + kube_get_workers_addrs(output = L) |
| 79 | + Y = tensor_array(len(L)) |
| 80 | + parallel_for(input = X, output = Y, |
| 81 | + attrs = {L, block_id(1)}) # referring to block 1 |
| 82 | + ] |
| 83 | + } |
| 84 | + |
| 85 | + block[1] = Block { |
| 86 | + parent = 0, |
| 87 | + vars = [x, y, index], |
| 88 | + ops = [ |
| 89 | + slice(input = [X, index], output = x) # index is initialized by parallel_for |
| 90 | + send(input = x, attrs = L[index]) |
| 91 | + recv(outputs = y, attrs = L[index]) |
| 92 | + assign(input = y, output = Y[index]) |
| 93 | + ] |
| 94 | + } |
| 95 | +} |
| 96 | +``` |
| 97 | + |
| 98 | +The equivalent Fluid program (calling the Go binding) is: |
| 99 | + |
| 100 | +```go |
| 101 | +func main() { //// block 0 |
| 102 | + X = fluid.read(...) |
| 103 | + L = fluid.k8s.get_worker_addrs() |
| 104 | + Y = fluid.tensor_array(len(L)) |
| 105 | + fluid.parallel_for(X, L, |
| 106 | + func(index int) { //// block 1 |
| 107 | + x = X[index] |
| 108 | + fluid.send(L[index], x) |
| 109 | + y = fluid.recv(L[index]) |
| 110 | + Y[index] = y |
| 111 | + }) |
| 112 | +} |
| 113 | +``` |
| 114 | + |
| 115 | +An explanation of the above program: |
| 116 | + |
| 117 | +- `fluid.k8s` is a package that provides access to Kubernetes API. |
| 118 | +- `fluid.k8s.get_worker_addrs` returns the list of IP and ports of all pods of the current job except for the current one (the master pod). |
| 119 | +- `fluid.tensor_array` creates a [tensor array](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/lod_tensor_array.h). `fluid.parallel_for` creates a `ParallelFor` intrinsic, which, when executed, |
| 120 | + |
| 121 | + 1. creates `len(L)` scopes, each for the concurrent running of the sub-block (block 1 in this case), and initializes a variable named "index" in the scope to an integer value in the range `[0, len(L)-1]`, and |
| 122 | + 2. creates `len(L)` threads by calling into the `ThreadPool` singleton, each thread |
| 123 | + 1. creates an Executor instance, and |
| 124 | + 2. calls `Executor.Run(block)`, where `block` is block 1 as explained above. |
| 125 | +1. Please be aware that block 1 is a sub-block of block 0, so ops in block 1 could refer to variables defined in block 0. |
| 126 | + |
| 127 | +### The Worker Program |
| 128 | + |
| 129 | +The worker program looks like |
| 130 | + |
| 131 | +```go |
| 132 | +func main() { |
| 133 | + W = Tensor(...) |
| 134 | + x = fluid.listen_and_do( |
| 135 | + fluid.k8s.self_addr(), |
| 136 | + func(input Tensor) { |
| 137 | + output = fluid.mult(input, W) |
| 138 | + }) |
| 139 | +} |
| 140 | +``` |
| 141 | + |
| 142 | +where |
| 143 | + |
| 144 | +- `fluid.listen_and_do` creates a `ListenAndDo` intrinsic, which, when executed, |
| 145 | + 1. listens on the current pod's IP address, as returned by `fliud.k8s.self_addr()`, |
| 146 | + 2. once a connection is established, |
| 147 | + 1. creates a scope of two parameters, "input" and "output", |
| 148 | + 2. reads a [Fluid variable](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/variable.h) and saves it into "input", |
| 149 | + 3. creates an Executor instance and calls `Executor.Run(block)`, where the block is generated by running the lambda specified as the second parameter of `fluid.listen_and_do`. |
| 150 | + |
| 151 | +## Summarization |
| 152 | + |
| 153 | +From the above example, we see that: |
| 154 | + |
| 155 | +1. Fluid enables the imperative programming paradigm by: |
| 156 | + 1. letting users describe a program, but not a model (a sequence of layers, or a graph of operators), and |
| 157 | + 2. call the `fluid.run` function that runs the program implicitly. |
| 158 | +1. The program is described as a `ProgramDesc` protobuf message. |
| 159 | +2. Function `Executor.Run` takes a block, instead of a `ProgramDesc`, as its parameter. |
| 160 | +3. `fluid.run` calls `Executor.Run` to run the first block in the `ProgramDesc` message. |
| 161 | +4. `Executor.Run`'s implementation is extremely simple -- it doesn't plan the execution nor create threads; instead, it runs on the current thread and execute intrinsics/operators' `Run` method sequentially as they appear in the `Block.ops` array. |
| 162 | +5. Intrinsics/operators' `Run` method might create threads. For example, the `ListenAndDo` operator creates a thread to handle each incoming request. |
| 163 | +6. Threads are not necessarily OS thread; instead, they could be [green threads](https://en.wikipedia.org/wiki/Green_threads) managed by ThreadPool. Multiple green threads might run on the same OS thread. An example green threads is Go's [goroutines](https://tour.golang.org/concurrency/1). |
0 commit comments