|
| 1 | +# select_op Design |
| 2 | + |
| 3 | +## Introduction |
| 4 | + |
| 5 | +In golang, the [**select**](https://golang.org/ref/spec#Select_statements) |
| 6 | +statement lets a goroutine wait on multiple communication operations at the |
| 7 | +same time. The **select** blocks until one of its cases can run, then |
| 8 | +executes the case. If multiple cases are ready to run, then one case is |
| 9 | +choosen at random to be executed. |
| 10 | + |
| 11 | +With the introduction of CSP for Paddle, we mimic this behavior by |
| 12 | +creating a ***select_op***. |
| 13 | + |
| 14 | +## How to use it |
| 15 | + |
| 16 | +The **select_op** is available as a c++ operator. However most users |
| 17 | +will prefer to use the much simplier Python API. |
| 18 | + |
| 19 | +- **fluid.Select()**: Creates a select operator and adds it to the current |
| 20 | +block within the main program. Also creates a sub block and adds it to the |
| 21 | +main program. This sub block is used to hold all variables and operators |
| 22 | +used by the case statements. |
| 23 | + |
| 24 | +Within the select block, users can add cases by |
| 25 | +calling **select.case** or **select.default** method. |
| 26 | + |
| 27 | +- **fluid.Select.case(channel_action, channel, result_variable)**: Represents |
| 28 | +a fluid channel send/recv case. This method creates a SelectCase block |
| 29 | +guard and adds it to the Select block. The arguments into this method tells |
| 30 | +the select which channel operation to listen to. |
| 31 | + |
| 32 | +- **fluid.Select.default()**: Represents the fluid default case. This default |
| 33 | +case is executed if none of the channel send/recv cases are available to |
| 34 | +execute. |
| 35 | + |
| 36 | +**Example:** |
| 37 | +``` |
| 38 | +ch1 = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR) |
| 39 | +quit_ch = fluid.make_channel(dtype=core.VarDesc.VarType.LOD_TENSOR) |
| 40 | + |
| 41 | +x = fill_constant(shape=[1], dtype=core.VarDesc.VarType.INT32, value=0) |
| 42 | +y = fill_constant(shape=[1], dtype=core.VarDesc.VarType.INT32, value=1) |
| 43 | + |
| 44 | +while_cond = fill_constant(shape=[1], dtype=core.VarDesc.VarType.BOOL, value=True) |
| 45 | +while_op = While(cond=while_cond) |
| 46 | + |
| 47 | +with while_op.block(): |
| 48 | + with fluid.Select() as select: |
| 49 | + with select.case(fluid.channel_send, channel, x): |
| 50 | + # Send x, then perform Fibonacci calculation on x and y |
| 51 | + x_tmp = fill_constant(shape=[1], dtype=core.VarDesc.VarType.INT32, value=0) |
| 52 | + assign(input=x, output=x_tmp) |
| 53 | + assign(input=y, output=x) |
| 54 | + assign(elementwise_add(x=x_tmp, y=y), output=y) |
| 55 | + with select.case(fluid.channel_recv, quit_channel, result2): |
| 56 | + # Exit out of While loop |
| 57 | + while_false = fill_constant(shape=[1], dtype=core.VarDesc.VarType.BOOL, value=False) |
| 58 | + helper = layer_helper.LayerHelper('assign') |
| 59 | + helper.append_op( |
| 60 | + type='assign', |
| 61 | + inputs={'X': [while_false]}, |
| 62 | + outputs={'Out': [while_cond]}) |
| 63 | +``` |
| 64 | + |
| 65 | +## How it Works |
| 66 | + |
| 67 | +### Program Description |
| 68 | + |
| 69 | +``` |
| 70 | +blocks { |
| 71 | + idx: 0 |
| 72 | + ... |
| 73 | + // Create "case_to_execute" variable |
| 74 | + ops { |
| 75 | + outputs { |
| 76 | + parameter: "Out" |
| 77 | + arguments: "fill_constant_110.tmp_0" |
| 78 | + } |
| 79 | + type: "fill_constant" |
| 80 | + attrs { |
| 81 | + name: "force_cpu" |
| 82 | + type: BOOLEAN |
| 83 | + b: false |
| 84 | + } |
| 85 | + attrs { |
| 86 | + name: "value" |
| 87 | + type: FLOAT |
| 88 | + f: -1.0 |
| 89 | + } |
| 90 | + attrs { |
| 91 | + name: "shape" |
| 92 | + type: INTS |
| 93 | + ints: 1 |
| 94 | + } |
| 95 | + attrs { |
| 96 | + name: "dtype" |
| 97 | + type: INT |
| 98 | + i: 2 |
| 99 | + } |
| 100 | + } |
| 101 | + // Create "select" operator. |
| 102 | + // inputs: |
| 103 | + // X: All input variables used by operators within the select block |
| 104 | + // case_to_execute: Variable filled in by select_op when it determines |
| 105 | + // which case to execute. |
| 106 | + // |
| 107 | + // outputs: |
| 108 | + // Out: All output variables referenced by operators within select block. |
| 109 | + // |
| 110 | + // attrs: |
| 111 | + // sub_block: The block id containing the select "cases" |
| 112 | + // cases: Serialized list of all cases in the select op. |
| 113 | + // Each case is serialized as: '<index>,<type>,<channel>,<value>' |
| 114 | + // where type is 0 for default, 1 for send, and 2 for receive. |
| 115 | + // No channel and values are needed for default cases. |
| 116 | + ops { |
| 117 | + inputs { |
| 118 | + parameter: "X" |
| 119 | + arguments: "fill_constant_103.tmp_0" |
| 120 | + arguments: "fill_constant_104.tmp_0" |
| 121 | + } |
| 122 | + inputs { |
| 123 | + parameter: "case_to_execute" |
| 124 | + arguments: "fill_constant_110.tmp_0" |
| 125 | + } |
| 126 | + outputs { |
| 127 | + parameter: "Out" |
| 128 | + arguments: "fill_constant_110.tmp_0" |
| 129 | + } |
| 130 | + type: "select" |
| 131 | + attrs { |
| 132 | + name: "sub_block" |
| 133 | + type: BLOCK |
| 134 | + block_idx: 1 |
| 135 | + } |
| 136 | + attrs { |
| 137 | + name: "cases" |
| 138 | + type: STRINGS |
| 139 | + strings: "0,1,channel_101,fill_constant_109.tmp_0" |
| 140 | + strings: "1,2,channel_102,fill_constant_108.tmp_0" |
| 141 | + } |
| 142 | + } |
| 143 | + ... |
| 144 | +} |
| 145 | +``` |
| 146 | + |
| 147 | +The python select API will add the **select_op** to the current block. In addition, it will |
| 148 | +iterate through all it's case statements and add any input variables required by case statements |
| 149 | +into **X**. It will also create a temp variable called **case_to_execute**. This variable is |
| 150 | +filled in by the select_op after it has completed processing the case statements. |
| 151 | + |
| 152 | +If there are no available cases to execute (ie: all cases are blocked on channel operations, and |
| 153 | +there is no default statement), then the select_op will block the current thread. The thread will |
| 154 | +unblock once there is a channel operation affecting one of the case statements, at which point, the |
| 155 | +**select_op** will set the **case_to_execute** variable to the index of the case to execute. |
| 156 | + |
| 157 | +Finally the select_op will call executor.run on the **sub_block**. |
| 158 | + |
| 159 | +``` |
| 160 | +blocks { |
| 161 | + idx: 1 |
| 162 | + parent_idx: 0 |
| 163 | + ... |
| 164 | + // Fill a tensor with the case index (ie: 0,1,2,3,ect.) |
| 165 | + ops { |
| 166 | + outputs { |
| 167 | + parameter: "Out" |
| 168 | + arguments: "fill_constant_111.tmp_0" |
| 169 | + } |
| 170 | + type: "fill_constant" |
| 171 | + attrs { |
| 172 | + name: "force_cpu" |
| 173 | + type: BOOLEAN |
| 174 | + b: false |
| 175 | + } |
| 176 | + attrs { |
| 177 | + name: "value" |
| 178 | + type: FLOAT |
| 179 | + f: 0.0 |
| 180 | + } |
| 181 | + attrs { |
| 182 | + name: "shape" |
| 183 | + type: INTS |
| 184 | + ints: 1 |
| 185 | + } |
| 186 | + attrs { |
| 187 | + name: "dtype" |
| 188 | + type: INT |
| 189 | + i: 2 |
| 190 | + } |
| 191 | + } |
| 192 | + // Create an "equal" operator to compare the case index with the "case_to_execute" |
| 193 | + // tensor (which was filled in by the select op). |
| 194 | + ops { |
| 195 | + inputs { |
| 196 | + parameter: "X" |
| 197 | + arguments: "fill_constant_111.tmp_0" // case 0 |
| 198 | + } |
| 199 | + inputs { |
| 200 | + parameter: "Y" |
| 201 | + arguments: "fill_constant_110.tmp_0" // case_to_execute |
| 202 | + } |
| 203 | + outputs { |
| 204 | + parameter: "Out" |
| 205 | + arguments: "equal_0.tmp_0" |
| 206 | + } |
| 207 | + type: "equal" |
| 208 | + attrs { |
| 209 | + name: "axis" |
| 210 | + type: INT |
| 211 | + i: -1 |
| 212 | + } |
| 213 | + } |
| 214 | + // Use the output of the "equal" operator as a condition for the "conditional_block". |
| 215 | + // If the condition evaluates to true, then execute the "sub_block" (which represents |
| 216 | + // the select case's body) |
| 217 | + ops { |
| 218 | + inputs { |
| 219 | + parameter: "Params" |
| 220 | + } |
| 221 | + inputs { |
| 222 | + parameter: "X" |
| 223 | + arguments: "equal_0.tmp_0" |
| 224 | + } |
| 225 | + outputs { |
| 226 | + parameter: "Out" |
| 227 | + } |
| 228 | + outputs { |
| 229 | + parameter: "Scope" |
| 230 | + arguments: "_generated_var_0" |
| 231 | + } |
| 232 | + type: "conditional_block" |
| 233 | + attrs { |
| 234 | + name: "is_scalar_condition" |
| 235 | + type: BOOLEAN |
| 236 | + b: true |
| 237 | + } |
| 238 | + attrs { |
| 239 | + name: "sub_block" |
| 240 | + type: BLOCK |
| 241 | + block_idx: 4 |
| 242 | + } |
| 243 | + } |
| 244 | + ... |
| 245 | + // Repeat the above operators for each case statements inside the select body |
| 246 | +} |
| 247 | +
|
| 248 | +``` |
| 249 | + |
| 250 | +Cases are represented by a **conditional_block operator**, whose's condition is set as the output of |
| 251 | +equal(**case_to_execute**, **case_index**). Since each case index is unique in this sub-block, |
| 252 | +only one case will be executed. |
| 253 | + |
| 254 | +### select_op flow |
| 255 | + |
| 256 | +<p align="center"> |
| 257 | +<img src="./images/select_op_workflow.png"/><br/> |
| 258 | +</p> |
| 259 | + |
| 260 | +The select algorithm is inspired by golang's select routine. Please refer to |
| 261 | +http://www.tapirgames.com/blog/golang-concurrent-select-implementation for more information. |
| 262 | + |
| 263 | +## Backward Pass |
| 264 | + |
| 265 | +TODO |
0 commit comments