Skip to content
This repository was archived by the owner on Jul 10, 2025. It is now read-only.

Commit 5f54a30

Browse files
jbedorfliutongxuan
authored andcommitted
Fix irregular writing and expression issues.
1 parent 9cdd68b commit 5f54a30

File tree

1 file changed

+99
-96
lines changed

1 file changed

+99
-96
lines changed

rfcs/20200409-fuse_recv.md

Lines changed: 99 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -8,94 +8,94 @@
88
| **Updated** | 2020-04-09 |
99

1010
## Objective
11-
This RFC proposes a new FuseRecv Op which would recv multiple tensors with
12-
different types through one RPC. This feature could significantly reduce the
13-
number of RPC calls in most rank or match models in Search, Recommend or Ad
14-
System.
11+
This RFC proposes a new FuseRecv Op which would receive multiple tensors with
12+
different types through one Remote Procedure Call (RPC). This feature could
13+
significantly reduce the number of RPC calls in most rank or match models
14+
such as Search, Recommend or Ad systems.
1515

1616
## Motivation
1717
When very many small tensors are being transferred around the same time,
18-
it's more efficient to transfer multiple values in a single RPC rather than
19-
using a separate RPC for each.
18+
it's more efficient to transfer multiple tensors in a single RPC rather than
19+
using a separate RPC for each of them.
2020

2121
In the case the neural network graph is complicated, each iteration through
22-
the graph may introduce tens or even hundreds of RPC calls between the running
23-
nodes. In general there are a large number of small tensors, such as multiple
24-
feature columns gather from the same Parameter Server which have no dependence
25-
on each other, and each feature column results in at least one RPC call in
26-
the forward stage. In CTR (Click Through Rate) model or most sparse models
27-
(Match or Rank models widely used in Recommend system or Ad system), there
28-
would be hundreds of feature columns (in our scenario, each sample includes
29-
at least hundreds of features). One job normally have to use thousands of
30-
workers and tens of parameter servers. One worker generally has to gather
31-
or get variables from all the parameter servers, and each feature column
32-
at least in the forward stage has at least one request from the parameter
33-
server. There could be hundreds of RPC for these feature columns,
34-
and even more some big feature columns (such as ids) would be partitioned
35-
could be dozens of RPCs for one feature column. In summary there would be
36-
at least hundreds of RPC per worker only for these feature columns, and
37-
hundreds * thousands RPCs in each parameter server in the forward stage
38-
of one step. Most feature column only gather very small data from parameter
39-
server, normally less than 100KB. Logically these small tensors could be
40-
sent together. Furthermore, tensors that belong to the same layer can also
41-
be fused which would significantly reduce the number of RPC calls.
42-
43-
As we know, each RPC call will introduce some satellite overhead besides the
44-
real tensor value transfer, which includes:
45-
* Serialization/Deserialization introduce extra overhead for each RPC operation.
46-
* The execution engine overhead for executing a recv node, and the corresponding thread pool action to execute the RPC callback.
22+
the graph may introduce tens or even hundreds of RPC operations between the running
23+
nodes. In general, there are a large number of small tensors, such as multiple
24+
feature columns that gather data from the same Parameter Server. These tensors
25+
have no dependence on each other, and each feature column results in at least
26+
one RPC operation in the forward stage. In CTR (Click Through Rate) models or
27+
models that are mostly sparse (such as Match or Rank models that are widely
28+
used in Recommender and Ad systems), there would be hundreds of feature columns.
29+
In our scenario, each sample includes at least hundreds of features.
30+
One training job normally uses thousands of workers and tens of parameter servers.
31+
One worker generally has to get variables from all the parameter servers, and each
32+
feature column, at least in the forward stage, receives at least one request from
33+
the parameter server. There could be hundreds of RPC operations for these feature columns,
34+
and even more for some of the big feature columns (such as ids). These would be partitioned
35+
into dozens of RPCs per feature column. In summary there would be
36+
at least hundreds of RPC per worker for these feature columns only, and
37+
hundreds of thousands of RPCs per step, for each parameter server in the forward stage.
38+
Most feature columns only gather very small tensors from the parameter
39+
server, usually less than 100KB. Logically these small tensors could be
40+
sent together (e.g. fused). Furthermore, tensors that belong to the same layer can also
41+
be fused before transfer, which would significantly reduce the number of RPC operations.
42+
43+
As we know, each RPC operations introduces some satellite overhead besides the
44+
actual tensor data transfer, which includes:
45+
* Serialization/Deserialization which introduces additional overhead for each RPC operation.
46+
* The execution engine overhead for executing a Recv node operation, and the corresponding thread pool
47+
action required to execute the RPC callback function.
4748

4849
## User Benefit
4950

50-
Performance improvement: From feedbacks of the feature, in a large traning job
51-
(> 400 workers), normally traning speed would be 1.5-2x timer faster in
52-
ps-worker mode.
51+
Performance improvement: From performance benchmarking of the feature during large
52+
(end-user) training jobs (> 400 workers), we normally see that the training speed would
53+
be 1.5-2x timer faster in the parameter-server/worker setup.
5354

5455
## Design Proposal
5556

5657
![Figure 1: Current graph partition strategy](20200409-fuse_recv/current_graph_partition_strategy.png "Current graph partition strategy")
5758
![Figure 2: Graph partition strategy with FuseRecv](20200409-fuse_recv/graph_partition_strategy_with_fuse_recv.png "Graph partition strategy with FuseRecv")
5859

59-
In original design of Recv/Send, each Recv node only recv one tensor
60-
even if there're Recv Ops output to same destination Op. Moreover each
61-
Recv node would trigger one RPC call even received tensor is a scalar.
60+
In the original Recv/Send design, each Recv node only receives one tensor
61+
even if there are Recv Ops that output to the same destination Op. Moreover each
62+
Recv node would trigger one RPC operation even if the received tensor is a scalar.
6263

63-
In this design, we traverse graphs, replace Recv nodes by FuseRecv node in
64-
partitioned graphs according to its topology while iteratively searching
65-
and fusing potential Recv node.
64+
In the proposed design, we traverse (partitioned) graphs according to
65+
its topology and iteratively replace Recv nodes with the new FuseRecv nodes.
66+
Please refer to the details in Section [FuseRecv Optimizer in Grappler](#FuseRecv Optimizer in Grappler)
6667

67-
As shown in Figure 1 and 2, instead of adding a Recv node for each tensor
68-
‘a’ and ‘x’, we use one FuseRecv to replace two Recv nodes which fetch two
69-
tensors together. The FuseRecv node will have two output ‘slots’(‘ports’):
70-
slot 0 feeds input ‘b’ and ‘c’ and slot 1 feeds ‘y’. Notice that there is
71-
no need to fuse the send node, because the RPC call is Recv driven.
68+
As shown in Figures 1 and 2, instead of adding a Recv node for each tensor
69+
‘a’ and ‘x’, we use only one FuseRecv node to replace the two Recv nodes which
70+
fetches two tensors together. The FuseRecv node will have two output
71+
‘slots’ (‘ports’): slot 0 feeds input ‘b’ and ‘c’ and slot 1 feeds ‘y’.
72+
Notice that, because the RPC operation is Recv driven, there is no need
73+
to fuse the send node.
7274

73-
A new RPC method ‘FuseRecvTensorAsync’ and it's
74-
Handler (FuseRecvTensorHandlerRaw) is added into WorkInterface and
75-
WorkerService. FuseRecvTensor keeps similar optimizations as
76-
RecvTensor to avoid copying the repsonse buffer.
75+
A new RPC method ‘FuseRecvTensorAsync’ and its Handler (FuseRecvTensorHandlerRaw)
76+
is added into WorkInterface and WorkerService. FuseRecvTensor follows similar
77+
optimization steps as RecvTensor to avoid copying the response buffer.
7778

7879
### Alternatives Considered
79-
#### Fuse the tensors into a single Send/Recv Solution 1(Derek Murray)
80+
#### Fuse the tensors into a single Send/Recv Solution 1 (Derek Murray)
8081
Pack the N tensors to be sent into a length-N DT_VARIANT vector.
8182

82-
Cons: Reuse currently RPC, avoid potiential intricate changes in zero-copy
83-
repsonse buffer code.
84-
Pros: Introduce memcopy overhead.
83+
Pros: Reuse currently RPC, avoid potential intricate changes in zero-copy
84+
response buffer code.
85+
Cons: Introduce memcopy overhead.
8586

86-
#### Fuse the tensors into a single Send/Recv Solution 2(Derek Murray)
87-
Pack the tensor contents into a single flattened buffer. Pack the tensor
88-
contents into a single flattened buffer. This would be very similar to the
89-
ScopedAllocator optimization that [email protected] and [email protected]
90-
implemented for collectives, and it might be possible to reuse some of
91-
the graph analysis code
87+
#### Fuse the tensors into a single Send/Recv Solution 2 (Derek Murray)
88+
Pack the tensor contents into a single flattened buffer. This would be very
89+
similar to the ScopedAllocator optimization that [email protected] and
90+
[email protected] implemented for collectives, and it might be possible
91+
to reuse some of the graph analysis code
9292

93-
Cons: Reuse currently RPC, avoid potiential intricate changes in zero-copy
94-
repsonse buffer code.
95-
Pros: The fused tensors could be different types and dynamic shape,
96-
which couldn't be handled by the solution.
93+
Pros: Reuse currently RPC, avoid potential intricate changes in zero-copy
94+
response buffer code.
95+
Cons: The fused tensors could be of different types and dynamic shapes,
96+
which couldn't be handled by this solution.
9797

98-
#### Dynamic Fusion in runtime(Paul Tucker)
98+
#### Dynamic Fusion in runtime (Paul Tucker)
9999
Instead of adding a new FuseRecvTensor method to the Worker interface,
100100
we add a slightly different RecvSomeTensors method. The client sends a
101101
list of keys for which it's ready to receive values to the server and the
@@ -107,7 +107,7 @@ For example, on the client side a call to RecvTensor on the local Rendezvous
107107
for a remote value does not necessarily result in an immediate RPC. It might
108108
if the value is expected to be large, but it might also just add the key to
109109
a ready set associated with the remote host. An RPC may not be sent until
110-
the ready set reaches a certain size, or a mininum time has elapsed since the
110+
the ready set reaches a certain size, or a minimum time has elapsed since the
111111
last RPC against that host was started. When the response is received any
112112
missing keys go back in the ready set.
113113

@@ -116,10 +116,10 @@ method whether to wait for more of the requested values to be ready or just
116116
immediately send what's available now and let the client re-request anything
117117
missing.
118118

119-
Cons: Dynamic fusion in runtime seems get better result, and also brings
119+
Pros: Dynamic fusion in runtime seems get better result, and also brings
120120
ability to control priority of tensors (which Recv is more important).
121-
Pros: Potential bottleneck of the solution is the time window of ready set.
122-
For different models it would be much different, manually set the value
121+
Cons: Potential bottleneck of the solution is the time window of ready set.
122+
For different models it would be much different, manually setting the value
123123
would be hard. This solution is another good candidate of FuseRecv.
124124

125125
### Performance Implications
@@ -131,37 +131,38 @@ by 55%, and the overall training throughput has increased by 40%.
131131
* None
132132

133133
### Engineering Impact
134-
* Engineering impact: Once manually enable the feature (in ConfigProto.GraphOptions.do_fuse_recv), test times would be slower because FuseRecv post-partitioned optimizer would traverse and update graph.
135-
* Maintenance: Minimal maintennace overhead. The TensorFlow team and contributors will maintain the documentation up to date. Changes should be reviewed and approved by the TensorFlow team leads.
134+
* Engineering impact: Once the feature is (manually) enabled (in ConfigProto.GraphOptions.do_fuse_recv), the test times would be longer because the FuseRecv post-partitioned optimizer would traverse and update the graph.
135+
* Maintenance: Minimal maintenance overhead. The TensorFlow team and contributors will maintain the documentation and keep it up to date. Changes should be reviewed and approved by the TensorFlow team leads.
136136

137137
### Platforms and Environments
138138
* Platforms: The feature is independent of platforms.
139-
* Execution environments (Cloud services, accelerator hardware): The first stage would support CPU & GPU device. We consider more device supported as much as possible.
139+
* Execution environments (Cloud services, accelerator hardware): The first stage would support CPU & GPU device. We consider supporting
140+
additional devices as much as possible.
140141

141142
### Best Practices
142-
* We strongly suggest enable FuseRecv in rank or match models such as [W&DL](https://arxiv.org/abs/1606.07792), [Dien](https://arxiv.org/abs/1809.03672).
143+
* We strongly suggest to enable FuseRecv in rank or match models such as [W&DL](https://arxiv.org/abs/1606.07792), [Dien](https://arxiv.org/abs/1809.03672).
143144

144145
### Tutorials and Examples
145-
Example enable FuseRecv feature:
146+
Example of how to enable the FuseRecv feature:
146147

147148
```
148149
>>> config = tf.ConfigProto()
149150
>>> config.graph_options.optimizer_options.experimental.do_fuse_recv = True
150151
```
151152

152153
### Compatibility
153-
* This feature work with ParameterServerStrategy.
154-
* This feature consider tensors on difference devices such as CPU, GPU and TPU.
155-
* Independent with SaveModel or checkpoint.
154+
* This feature works with the ParameterServerStrategy.
155+
* This feature considers tensors on difference devices such as CPU, GPU and TPU.
156+
* Independent of SaveModel or checkpoint.
156157

157158
### User Impact
158159
* None
159160

160161
## Detailed Design
161162

162163
### FuseRecv Op
163-
We introduce FuseRecv Op and an RPC call named FuseRecvTensorAsync in
164-
RemoteWorker and WorkerService. FuseRecv Op definitions as follows:
164+
We introduce the FuseRecv Op and an RPC operation named FuseRecvTensorAsync in
165+
RemoteWorker and WorkerService. The FuseRecv Op definition is as follows:
165166

166167
```
167168
>>> REGISTER_OP("FuseRecv")
@@ -176,52 +177,54 @@ RemoteWorker and WorkerService. FuseRecv Op definitions as follows:
176177
>>> .SetShapeFn(shape_inference::UnknownShape);
177178
```
178179

179-
FuseRecv request list of tensors with different types from remote, generally
180-
we only fuse the recv ops in the same recv device and the same send device.
180+
FuseRecv requests a list of tensors with different types from remote devices, generally
181+
we only fuse the Recv ops in the same recv device and on the same send device.
181182

182183
### FuseRecv Optimizer in Grappler
183-
In post partition phase, we add a new pass in the post-partitioning optimizer
184-
called “FuseRecv” to fuse recv ops together. We traverse partitioned graphs &
185-
whole graph, replace Recv ops by FuseRecv ops in partitioned graphs according
186-
to its topology while iteratively searching and fusing potential recv
187-
operations.
184+
During the post partition phase, we add a new pass to the post-partitioning optimizer
185+
called “FuseRecv” to fuse Recv ops together. We traverse partitioned graphs &
186+
the whole graph, replace Recv ops by FuseRecv ops in the partitioned graphs according
187+
to its topology while iteratively searching and fusing potential Recv
188+
operations. See Figure 4 for the formal algorithm definition.
188189

189190
![Figure 4: fuse_recv_procedure](20200409-fuse_recv/fuse_recv_procedure.png "Fuse Recv Procedure")
190191

191192
The procedure RECVFUSE takes two input arguments: 1) the TF computation
192193
graph g, 2) a Partitioned graph. It is worth noting that the iteration of
193-
all nodes at line 11 shall start from `root` nodes, which do not have any
194+
all nodes shall start from the `root` nodes, which do not have any
194195
source edge (node). The process between line 17 and 37 would be iteratively
195-
executed until all of the nodes are moved from nodes to nodes_done. The
196+
executed and output key-value pairs (value: a group of edges could be fused
197+
into one FuseRecv node). Then based on the grouped edges, we find out Recv
198+
nodes in partitioned graph which could be replace by FusedRecv nodes. Besides
196199
RECVFUSE also makes sure that no deadlock exists after the change to the
197-
original graph. Also, the RPC call of FuseRecvTensor overlaps the computation
198-
and communication by using the topology of the graph.
200+
original graph. Also, the RPC operation of FuseRecvTensor is able to overlap
201+
the computation and communication by using the graph topology.
199202

200203
### FuseRecv RPC Method and Handler
201-
A new RPC method ‘FuseRecvTensorAsync’ is added to the WorkerInterface
202-
we extend the ‘FuseRecvTensorAsync’ method with the ability to handle
204+
A new RPC method ‘FuseRecvTensorAsync’ is added to the WorkerInterface.
205+
We extend the ‘FuseRecvTensorAsync’ method with the ability to handle
203206
multi rendezvous keys and fetch multi key tensors.
204207

205208
At the server side, we add a ‘FuseRecvTensorHandlerRaw’, which handles
206209
the multi rendezvous key for the ‘local recv’ instantiated by the local
207210
tensor operations. As mentioned before, the sending nodes are not fused
208-
and we therefore have to do multiple local recvs corresponding to the
211+
and we therefore must do multiple local recvs corresponding to the
209212
multi send nodes.
210213

211214
Because the ‘FuseRecvTensorAsync’ handler might be executed before
212215
the send operations happen, a call back wrapper is required. We use
213216
a counter, initialized with the fuse count, and each send action triggers
214217
the call back wrapper and performs an atomic decrease of the counter,
215218
when the counter reaches 0, the real callback is executed and the tensors
216-
are sent to the recv node.
219+
are sent to the Recv node.
217220

218221
### Dead Tensor Handling
219-
We treat output of the FuseRecv node as dead if and only if all the
222+
We treat the output of the FuseRecv node as dead if and only if all the
220223
fused tensors are dead.
221224

222225
### FuseRecv Error Handling
223-
Status of FuseRecv node would be similar as Recv op, which include more
224-
informations of every recv tensors.
226+
The status of the FuseRecv node would be similar as the Recv node, which
227+
include additional information for every Recv tensor.
225228

226229
## Questions and Discussion Topics
227230

0 commit comments

Comments
 (0)