16
16
#include " paddle/fluid/framework/details/container_cast.h"
17
17
#include " paddle/fluid/framework/details/reduce_and_gather.h"
18
18
#include " paddle/fluid/framework/details/variable_visitor.h"
19
+ #include " paddle/fluid/framework/operator.h"
20
+
21
+ #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
22
+ #include " dgc/dgc.h"
23
+ #endif
24
+
25
+ #include " paddle/fluid/platform/gpu_info.h"
19
26
#include " paddle/fluid/platform/profiler.h"
20
27
21
28
// asynchronous nccl allreduce or synchronous issue:
@@ -33,11 +40,14 @@ namespace details {
33
40
AllReduceOpHandle::AllReduceOpHandle (ir::Node *node,
34
41
const std::vector<Scope *> &local_scopes,
35
42
const std::vector<platform::Place> &places,
36
- const platform::NCCLContextMap *ctxs)
43
+ const platform::NCCLContextMap *ctxs,
44
+ bool is_encoded, int nranks)
37
45
: OpHandleBase(node),
38
46
local_scopes_(local_scopes),
39
47
places_(places),
40
- nccl_ctxs_(ctxs) {
48
+ nccl_ctxs_(ctxs),
49
+ is_encoded_(is_encoded),
50
+ nranks_(nranks) {
41
51
if (nccl_ctxs_) {
42
52
for (auto &p : places_) {
43
53
this ->SetDeviceContext (p, nccl_ctxs_->DevCtx (p));
@@ -51,7 +61,185 @@ AllReduceOpHandle::AllReduceOpHandle(ir::Node *node,
51
61
: OpHandleBase(node), local_scopes_(local_scopes), places_(places) {}
52
62
#endif
53
63
64
+ #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
65
+ void AllReduceOpHandle::RunImplEncoded () {
66
+ platform::RecordEvent record_event (Name ());
67
+
68
+ WaitInputVarGenerated ();
69
+
70
+ auto in_var_handles = DynamicCast<VarHandle>(this ->Inputs ());
71
+ auto out_var_handles = DynamicCast<VarHandle>(this ->Outputs ());
72
+ PADDLE_ENFORCE_EQ (
73
+ in_var_handles.size (), places_.size (),
74
+ " The NoDummyInputSize should be equal to the number of places." );
75
+ PADDLE_ENFORCE_EQ (
76
+ in_var_handles.size (), out_var_handles.size (),
77
+ " The NoDummyInputSize and NoDummyOutputSize should be equal." );
78
+
79
+ std::vector<const LoDTensor *> ins;
80
+ std::vector<LoDTensor *> outs;
81
+ int k = -1 ;
82
+ for (size_t i = 0 ; i < local_scopes_.size (); ++i) {
83
+ auto &local_scope =
84
+ local_scopes_[i]->FindVar (kLocalExecScopeName )->Get <Scope *>();
85
+ auto original_name =
86
+ paddle::framework::GradOriginalVarName (in_var_handles[i]->name ());
87
+ auto encode_var_name = original_name + g_dgc_encoded;
88
+ auto *in_var = local_scope->FindVar (encode_var_name);
89
+ PADDLE_ENFORCE_NOT_NULL (in_var);
90
+ auto &in = in_var->Get <LoDTensor>();
91
+ ins.emplace_back (&in);
92
+
93
+ auto *out = local_scope->FindVar (out_var_handles[i]->name ())
94
+ ->GetMutable <LoDTensor>();
95
+ outs.emplace_back (out);
96
+
97
+ if (k < 0 ) {
98
+ k = GetKValue (in_var_handles[i]->name ());
99
+ }
100
+ }
101
+
102
+ PADDLE_ENFORCE (platform::is_gpu_place (ins[0 ]->place ()));
103
+ PADDLE_ENFORCE (platform::is_gpu_place (outs[0 ]->place ()));
104
+ PADDLE_ENFORCE (nccl_ctxs_, " nccl_ctxs should not be nullptr." );
105
+
106
+ int dtype = -1 ;
107
+ size_t in_numel = 0 ;
108
+ size_t out_numel = 0 ;
109
+ PADDLE_ENFORCE (nranks_ > 1 );
110
+ std::vector<std::function<void ()>> all_reduce_calls;
111
+
112
+ for (size_t i = 0 ; i < local_scopes_.size (); ++i) {
113
+ auto &place = places_[i];
114
+ auto &in = *ins[i];
115
+ void *in_tensor_buf = const_cast <void *>(in.data <void >());
116
+
117
+ auto &out = *outs[i];
118
+ float *out_tensor_buf = out.data <float >();
119
+
120
+ dtype = (dtype == -1 ) ? platform::ToNCCLDataType (in.type ()) : dtype;
121
+ in_numel = (in_numel == 0 ) ? static_cast <size_t >(in.numel ()) : in_numel;
122
+ PADDLE_ENFORCE (in_numel % 2 == 0 );
123
+ PADDLE_ENFORCE (in_numel / 2 == static_cast <size_t >(k));
124
+ out_numel = (out_numel == 0 ) ? static_cast <size_t >(out.numel ()) : out_numel;
125
+
126
+ int dev_id = boost::get<platform::CUDAPlace>(place).device ;
127
+ auto &nccl_ctx = nccl_ctxs_->at (dev_id);
128
+ auto stream = nccl_ctx.stream ();
129
+ auto comm = nccl_ctx.comm_ ;
130
+
131
+ auto &allocator =
132
+ platform::DeviceTemporaryAllocator::Instance ().Get (place, stream);
133
+ int encode_size = 2 * k * sizeof (int );
134
+ // dgc use ncclAllGather to get all the encoded data
135
+ // so the buffer need nranks.
136
+ int buf_size = nranks_ * encode_size;
137
+ auto tmp_ious_data = allocator.Allocate (buf_size);
138
+ void *gather_buff = reinterpret_cast <void *>(tmp_ious_data->ptr ());
139
+
140
+ VLOG (10 ) << " in_numel:" << in_numel << " , out_numel:" << out_numel
141
+ << " , nranks:" << nranks_ << " , gather_buf size:" << buf_size
142
+ << " , k:" << k << " , place:" << place << " , dtype:" << dtype;
143
+
144
+ all_reduce_calls.emplace_back ([=] {
145
+ PADDLE_ENFORCE (paddle::communication::dgc::sparseAllGReduce (
146
+ in_tensor_buf, gather_buff, k, out_tensor_buf, out_numel, comm,
147
+ stream));
148
+ });
149
+ }
150
+
151
+ this ->RunAndRecordEvent ([&] {
152
+ if (all_reduce_calls.size () == 1UL ) {
153
+ // Do not use NCCLGroup when manage NCCL by per thread per device
154
+ all_reduce_calls[0 ]();
155
+ } else {
156
+ platform::NCCLGroupGuard guard;
157
+ for (auto &call : all_reduce_calls) {
158
+ call ();
159
+ }
160
+ }
161
+ });
162
+
163
+ if (FLAGS_sync_nccl_allreduce) {
164
+ for (auto &p : places_) {
165
+ int dev_id = boost::get<platform::CUDAPlace>(p).device ;
166
+ auto &nccl_ctx = nccl_ctxs_->at (dev_id);
167
+ auto stream = nccl_ctx.stream ();
168
+ cudaError_t e_sync = cudaStreamSynchronize (stream);
169
+ if (e_sync != 0 ) {
170
+ LOG (FATAL) << " cudaStreamSynchronize " << cudaGetErrorString (e_sync);
171
+ }
172
+
173
+ cudaError_t e_get = cudaGetLastError ();
174
+ if (e_get != 0 ) {
175
+ LOG (FATAL) << " cudaGetLastError " << cudaGetErrorString (e_get)
176
+ << " errno:" << e_get;
177
+ }
178
+ }
179
+ }
180
+ }
181
+
182
+ int AllReduceOpHandle::GetKValue (const std::string &grad_name) {
183
+ auto original_name = paddle::framework::GradOriginalVarName (grad_name);
184
+ auto var_name = original_name + g_dgc_k;
185
+ PADDLE_ENFORCE (local_scopes_.size () > 0 );
186
+
187
+ auto *scope = local_scopes_[0 ];
188
+ auto &local_scope = scope->FindVar (kLocalExecScopeName )->Get <Scope *>();
189
+ auto var = local_scope->FindVar (var_name);
190
+ PADDLE_ENFORCE_NOT_NULL (var);
191
+ auto tensor = var->Get <LoDTensor>().data <float >();
192
+ return *tensor;
193
+ }
194
+ #endif
195
+
196
+ #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
197
+ bool AllReduceOpHandle::IsEncoded () {
198
+ if (!is_encoded_) {
199
+ return false ;
200
+ }
201
+ auto counter_name = g_dgc_counter_name;
202
+ auto step_name = g_dgc_rampup_begin_step;
203
+ PADDLE_ENFORCE (local_scopes_.size () > 0 );
204
+
205
+ auto *scope = local_scopes_[0 ];
206
+ auto &local_scope = scope->FindVar (kLocalExecScopeName )->Get <Scope *>();
207
+ auto count_var = local_scope->FindVar (counter_name);
208
+ auto step_var = local_scope->FindVar (step_name);
209
+ if (count_var == nullptr || step_var == nullptr ) {
210
+ PADDLE_THROW (" not find count_var:%s or step_var:%s" , counter_name,
211
+ step_var);
212
+ }
213
+
214
+ float count = *count_var->Get <LoDTensor>().data <float >();
215
+ float step = *step_var->Get <LoDTensor>().data <float >();
216
+ if (static_cast <int >(count) < static_cast <int >(step)) {
217
+ VLOG (10 ) << " in all_reduce currentstep:" << count
218
+ << " < rampup_begin_step:" << step
219
+ << " so not use sparse all reduce" ;
220
+ return false ;
221
+ }
222
+
223
+ return true ;
224
+ }
225
+ #else
226
+ bool AllReduceOpHandle::IsEncoded () { return false ; }
227
+ #endif
228
+
54
229
void AllReduceOpHandle::RunImpl () {
230
+ if (!IsEncoded ()) {
231
+ RunImplNormal ();
232
+ return ;
233
+ }
234
+
235
+ #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
236
+ RunImplEncoded ();
237
+ #else
238
+ PADDLE_THROW (" Not compiled with CUDA" );
239
+ #endif
240
+ }
241
+
242
+ void AllReduceOpHandle::RunImplNormal () {
55
243
platform::RecordEvent record_event (Name ());
56
244
57
245
WaitInputVarGenerated ();
@@ -72,6 +260,8 @@ void AllReduceOpHandle::RunImpl() {
72
260
auto &lod_tensor =
73
261
local_scope.FindVar (in_var_handles[i]->name ())->Get <LoDTensor>();
74
262
lod_tensors.emplace_back (&lod_tensor);
263
+ VLOG (10 ) << " place:" << i << " , input_name:" << in_var_handles[i]->name ()
264
+ << " , out_name:" << out_var_handles[i]->name ();
75
265
PADDLE_ENFORCE_EQ (in_var_handles[i]->name (), out_var_handles[i]->name (),
76
266
" The name of input and output should be equal." );
77
267
}
@@ -99,13 +289,17 @@ void AllReduceOpHandle::RunImpl() {
99
289
auto &nccl_ctx = nccl_ctxs_->at (dev_id);
100
290
auto stream = nccl_ctx.stream ();
101
291
auto comm = nccl_ctx.comm_ ;
292
+
293
+ VLOG (10 ) << " before all reduce buffer:" << buffer << " , numel:" << numel
294
+ << " , dev_id:" << dev_id << " , dtype:" << dtype
295
+ << " , place:" << p;
296
+
102
297
all_reduce_calls.emplace_back ([=] {
103
298
PADDLE_ENFORCE (platform::dynload::ncclAllReduce (
104
299
buffer, buffer, numel, static_cast <ncclDataType_t>(dtype), ncclSum,
105
300
comm, stream));
106
301
});
107
302
}
108
-
109
303
this ->RunAndRecordEvent ([&] {
110
304
if (all_reduce_calls.size () == 1UL ) {
111
305
// Do not use NCCLGroup when manage NCCL by per thread per device
0 commit comments