|
| 1 | +# FuseRecv |
| 2 | + |
| 3 | +| Status | (Proposed / Accepted / Implemented / Obsolete) | |
| 4 | +:-------------- |:---------------------------------------------------- | |
| 5 | +| |
| 6 | +| **Author(s) ** | Tongxuan Liu( [email protected]) Peng Tao( [email protected]) Langshi Chen ( [email protected]) | |
| 7 | +| **Sponsor** | i | |
| 8 | +| **Updated** | 2020-04-09 | |
| 9 | + |
| 10 | +## Objective |
| 11 | +This RFC proposes a new FuseRecv Op which would recv multiple tensors with |
| 12 | +different types through one RPC. This feature could significantly reduce the |
| 13 | +number of RPC calls in most rank or match models in Search, Recommend or Ad |
| 14 | +System. |
| 15 | + |
| 16 | +## Motivation |
| 17 | +When very many small tensors are being transferred around the same time, |
| 18 | +it's more efficient to transfer multiple values in a single RPC rather than |
| 19 | +using a separate RPC for each. |
| 20 | + |
| 21 | +In the case the neural network graph is complicated, each iteration through |
| 22 | +the graph may introduce tens or even hundreds of RPC calls between the running |
| 23 | +nodes. In general there are a large number of small tensors, such as multiple |
| 24 | +feature columns gather from the same Parameter Server which have no dependence |
| 25 | +on each other, and each feature column results in at least one RPC call in |
| 26 | +the forward stage. In CTR (Click Through Rate) model or most sparse models |
| 27 | +(Match or Rank models widely used in Recommend system or Ad system), there |
| 28 | +would be hundreds of feature columns (in our scenario, each sample includes |
| 29 | +at least hundreds of features). One job normally have to use thousands of |
| 30 | +workers and tens of parameter servers. One worker generally has to gather |
| 31 | +or get variables from all the parameter servers, and each feature column |
| 32 | +at least in the forward stage has at least one request from the parameter |
| 33 | +server. There could be hundreds of RPC for these feature columns, |
| 34 | +and even more some big feature columns (such as ids) would be partitioned |
| 35 | +could be dozens of RPCs for one feature column. In summary there would be |
| 36 | +at least hundreds of RPC per worker only for these feature columns, and |
| 37 | +hundreds * thousands RPCs in each parameter server in the forward stage |
| 38 | +of one step. Most feature column only gather very small data from parameter |
| 39 | +server, normally less than 100KB. Logically these small tensors could be |
| 40 | +sent together. Furthermore, tensors that belong to the same layer can also |
| 41 | +be fused which would significantly reduce the number of RPC calls. |
| 42 | + |
| 43 | +As we know, each RPC call will introduce some satellite overhead besides the |
| 44 | +real tensor value transfer, which includes: |
| 45 | +* Serialization/Deserialization introduce extra overhead for each RPC operation. |
| 46 | +* The execution engine overhead for executing a recv node, and the corresponding thread pool action to execute the RPC callback. |
| 47 | + |
| 48 | +## User Benefit |
| 49 | + |
| 50 | +Performance improvement: From feedbacks of the feature, in a large traning job |
| 51 | +(> 400 workers), normally traning speed would be 1.5-2x timer faster in |
| 52 | +ps-worker mode. |
| 53 | + |
| 54 | +## Design Proposal |
| 55 | + |
| 56 | + |
| 57 | + |
| 58 | + |
| 59 | +In original design of Recv/Send, each Recv node only recv one tensor |
| 60 | +even if there're Recv Ops output to same destination Op. Moreover each |
| 61 | +Recv node would trigger one RPC call even received tensor is a scalar. |
| 62 | + |
| 63 | +In this design, we traverse graphs, replace Recv nodes by FuseRecv node in |
| 64 | +partitioned graphs according to its topology while iteratively searching |
| 65 | +and fusing potential Recv node. |
| 66 | + |
| 67 | +As shown in Figure 1 and 2, instead of adding a Recv node for each tensor |
| 68 | +‘a’ and ‘x’, we use one FuseRecv to replace two Recv nodes which fetch two |
| 69 | +tensors together. The FuseRecv node will have two output ‘slots’(‘ports’): |
| 70 | +slot 0 feeds input ‘b’ and ‘c’ and slot 1 feeds ‘y’. Notice that there is |
| 71 | +no need to fuse the send node, because the RPC call is Recv driven. |
| 72 | + |
| 73 | +A new RPC method ‘FuseRecvTensorAsync’ and it's |
| 74 | +Handler (FuseRecvTensorHandlerRaw) is added into WorkInterface and |
| 75 | +WorkerService. FuseRecvTensor keeps similar optimizations as |
| 76 | +RecvTensor to avoid copying the repsonse buffer. |
| 77 | + |
| 78 | +### Alternatives Considered |
| 79 | +#### Fuse the tensors into a single Send/Recv Solution 1(Derek Murray) |
| 80 | +Pack the N tensors to be sent into a length-N DT_VARIANT vector. |
| 81 | + |
| 82 | +Cons: Reuse currently RPC, avoid potiential intricate changes in zero-copy |
| 83 | +repsonse buffer code. |
| 84 | +Pros: Introduce memcopy overhead. |
| 85 | + |
| 86 | +#### Fuse the tensors into a single Send/Recv Solution 2(Derek Murray) |
| 87 | +Pack the tensor contents into a single flattened buffer. Pack the tensor |
| 88 | +contents into a single flattened buffer. This would be very similar to the |
| 89 | +ScopedAllocator optimization that [email protected] and [email protected] |
| 90 | +implemented for collectives, and it might be possible to reuse some of |
| 91 | +the graph analysis code |
| 92 | + |
| 93 | +Cons: Reuse currently RPC, avoid potiential intricate changes in zero-copy |
| 94 | +repsonse buffer code. |
| 95 | +Pros: The fused tensors could be different types and dynamic shape, |
| 96 | +which couldn't be handled by the solution. |
| 97 | + |
| 98 | +#### Dynamic Fusion in runtime(Paul Tucker) |
| 99 | +Instead of adding a new FuseRecvTensor method to the Worker interface, |
| 100 | +we add a slightly different RecvSomeTensors method. The client sends a |
| 101 | +list of keys for which it's ready to receive values to the server and the |
| 102 | +server streams back one or more when it's ready. It's the responsibility of |
| 103 | +the client to retry any key that was not included in the response. |
| 104 | + |
| 105 | +To make this work well there needs to be some dynamic bundling on each side. |
| 106 | +For example, on the client side a call to RecvTensor on the local Rendezvous |
| 107 | +for a remote value does not necessarily result in an immediate RPC. It might |
| 108 | +if the value is expected to be large, but it might also just add the key to |
| 109 | +a ready set associated with the remote host. An RPC may not be sent until |
| 110 | +the ready set reaches a certain size, or a mininum time has elapsed since the |
| 111 | +last RPC against that host was started. When the response is received any |
| 112 | +missing keys go back in the ready set. |
| 113 | + |
| 114 | +On the server side there could be some logic to decide for a RecvSomeTensors |
| 115 | +method whether to wait for more of the requested values to be ready or just |
| 116 | +immediately send what's available now and let the client re-request anything |
| 117 | +missing. |
| 118 | + |
| 119 | +Cons: Dynamic fusion in runtime seems get better result, and also brings |
| 120 | +ability to control priority of tensors (which Recv is more important). |
| 121 | +Pros: Potential bottleneck of the solution is the time window of ready set. |
| 122 | +For different models it would be much different, manually set the value |
| 123 | +would be hard. This solution is another good candidate of FuseRecv. |
| 124 | + |
| 125 | +### Performance Implications |
| 126 | +With a wide and deep model, the number of RPCs calls per step has been reduced |
| 127 | +by 55%, and the overall training throughput has increased by 40%. |
| 128 | + |
| 129 | + |
| 130 | +### Dependencies |
| 131 | +* None |
| 132 | + |
| 133 | +### Engineering Impact |
| 134 | +* Engineering impact: Once manually enable the feature (in ConfigProto.GraphOptions.do_fuse_recv), test times would be slower because FuseRecv post-partitioned optimizer would traverse and update graph. |
| 135 | +* Maintenance: Minimal maintennace overhead. The TensorFlow team and contributors will maintain the documentation up to date. Changes should be reviewed and approved by the TensorFlow team leads. |
| 136 | + |
| 137 | +### Platforms and Environments |
| 138 | +* Platforms: The feature is independent of platforms. |
| 139 | +* Execution environments (Cloud services, accelerator hardware): The first stage would support CPU & GPU device. We consider more device supported as much as possible. |
| 140 | + |
| 141 | +### Best Practices |
| 142 | +* We strongly suggest enable FuseRecv in rank or match models such as [W&DL](https://arxiv.org/abs/1606.07792), [Dien](https://arxiv.org/abs/1809.03672). |
| 143 | + |
| 144 | +### Tutorials and Examples |
| 145 | +Example enable FuseRecv feature: |
| 146 | + |
| 147 | +``` |
| 148 | + >>> config = tf.ConfigProto() |
| 149 | + >>> config.graph_options.optimizer_options.experimental.do_fuse_recv = True |
| 150 | +``` |
| 151 | + |
| 152 | +### Compatibility |
| 153 | +* This feature work with ParameterServerStrategy. |
| 154 | +* This feature consider tensors on difference devices such as CPU, GPU and TPU. |
| 155 | +* Independent with SaveModel or checkpoint. |
| 156 | + |
| 157 | +### User Impact |
| 158 | +* None |
| 159 | + |
| 160 | +## Detailed Design |
| 161 | + |
| 162 | +### FuseRecv Op |
| 163 | +We introduce FuseRecv Op and an RPC call named FuseRecvTensorAsync in |
| 164 | +RemoteWorker and WorkerService. FuseRecv Op definitions as follows: |
| 165 | + |
| 166 | +``` |
| 167 | + >>> REGISTER_OP("FuseRecv") |
| 168 | + >>> .Output("tensor: tensor_type") |
| 169 | + >>> .Attr("tensor_type: list(type)") |
| 170 | + >>> .Attr("tensor_name: list(string)") |
| 171 | + >>> .Attr("send_device: list(string)") |
| 172 | + >>> .Attr("send_device_incarnation: list(int)") |
| 173 | + >>> .Attr("recv_device: list(string)") |
| 174 | + >>> .Attr("client_terminated: bool = false") |
| 175 | + >>> .SetIsStateful() |
| 176 | + >>> .SetShapeFn(shape_inference::UnknownShape); |
| 177 | +``` |
| 178 | + |
| 179 | +FuseRecv request list of tensors with different types from remote, generally |
| 180 | +we only fuse the recv ops in the same recv device and the same send device. |
| 181 | + |
| 182 | +### FuseRecv Optimizer in Grappler |
| 183 | +In post partition phase, we add a new pass in the post-partitioning optimizer |
| 184 | +called “FuseRecv” to fuse recv ops together. We traverse partitioned graphs & |
| 185 | +whole graph, replace Recv ops by FuseRecv ops in partitioned graphs according |
| 186 | +to its topology while iteratively searching and fusing potential recv |
| 187 | +operations. |
| 188 | + |
| 189 | + |
| 190 | + |
| 191 | +The procedure RECVFUSE takes two input arguments: 1) the TF computation |
| 192 | +graph g, 2) a Partitioned graph. It is worth noting that the iteration of |
| 193 | +all nodes at line 11 shall start from `root` nodes, which do not have any |
| 194 | +source edge (node). The process between line 17 and 37 would be iteratively |
| 195 | +executed until all of the nodes are moved from nodes to nodes_done. The |
| 196 | +RECVFUSE also makes sure that no deadlock exists after the change to the |
| 197 | +original graph. Also, the RPC call of FuseRecvTensor overlaps the computation |
| 198 | +and communication by using the topology of the graph. |
| 199 | + |
| 200 | +### FuseRecv RPC Method and Handler |
| 201 | +A new RPC method ‘FuseRecvTensorAsync’ is added to the WorkerInterface |
| 202 | +we extend the ‘FuseRecvTensorAsync’ method with the ability to handle |
| 203 | +multi rendezvous keys and fetch multi key tensors. |
| 204 | + |
| 205 | +At the server side, we add a ‘FuseRecvTensorHandlerRaw’, which handles |
| 206 | +the multi rendezvous key for the ‘local recv’ instantiated by the local |
| 207 | +tensor operations. As mentioned before, the sending nodes are not fused |
| 208 | +and we therefore have to do multiple local recvs corresponding to the |
| 209 | +multi send nodes. |
| 210 | + |
| 211 | +Because the ‘FuseRecvTensorAsync’ handler might be executed before |
| 212 | +the send operations happen, a call back wrapper is required. We use |
| 213 | +a counter, initialized with the fuse count, and each send action triggers |
| 214 | +the call back wrapper and performs an atomic decrease of the counter, |
| 215 | +when the counter reaches 0, the real callback is executed and the tensors |
| 216 | +are sent to the recv node. |
| 217 | + |
| 218 | +### Dead Tensor Handling |
| 219 | +We treat output of the FuseRecv node as dead if and only if all the |
| 220 | +fused tensors are dead. |
| 221 | + |
| 222 | +### FuseRecv Error Handling |
| 223 | +Status of FuseRecv node would be similar as Recv op, which include more |
| 224 | +informations of every recv tensors. |
| 225 | + |
| 226 | +## Questions and Discussion Topics |
| 227 | + |
0 commit comments