Skip to content

Commit d9320dc

Browse files
committed
complete code
1 parent 7237323 commit d9320dc

File tree

8 files changed

+73
-15
lines changed

8 files changed

+73
-15
lines changed

paddle/fluid/framework/parallel_executor.cc

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ ParallelExecutor::ParallelExecutor(
5858
const std::unordered_set<std::string> &bcast_vars,
5959
const ProgramDesc &main_program, const std::string &loss_var_name,
6060
Scope *scope, const std::vector<Scope *> &local_scopes, bool allow_op_delay,
61-
bool use_default_grad_scale)
61+
bool use_default_grad_scale, size_t num_trainers, size_t trainer_id)
6262
: member_(new ParallelExecutorPrivate(places)) {
6363
member_->global_scope_ = scope;
6464

@@ -80,7 +80,13 @@ ParallelExecutor::ParallelExecutor(
8080

8181
// Bcast Parameters to all GPUs
8282
#ifdef PADDLE_WITH_CUDA
83-
member_->nccl_ctxs_.reset(new platform::NCCLContextMap(member_->places_));
83+
auto *nccl_id_var = scope->FindVar("NCCLID");
84+
ncclUniqueId *nccl_id = nullptr;
85+
if (nccl_id_var != nullptr) {
86+
nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();
87+
}
88+
member_->nccl_ctxs_.reset(new platform::NCCLContextMap(
89+
member_->places_, nccl_id, num_trainers, trainer_id));
8490
#endif
8591
if (platform::is_gpu_place(places[0]) && member_->local_scopes_.size() != 1 &&
8692
local_scopes.empty()) { // Is CUDA

paddle/fluid/framework/parallel_executor.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ class ParallelExecutor {
4040
const ProgramDesc& main_program,
4141
const std::string& loss_var_name, Scope* scope,
4242
const std::vector<Scope*>& local_scopes,
43-
bool allow_op_delay, bool use_default_grad_scale);
43+
bool allow_op_delay, bool use_default_grad_scale,
44+
size_t num_trainers = 0, size_t trainer_id = 0);
4445

4546
~ParallelExecutor();
4647

paddle/fluid/operators/detail/send_recv.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ service SendRecvService {
3232
enum VarType {
3333
LOD_TENSOR = 0;
3434
SELECTED_ROWS = 1;
35+
NCCL_ID = 2;
3536
}
3637

3738
// NOTICE(gongwb):don't modify this proto if you are not

paddle/fluid/operators/detail/sendrecvop_utils.cc

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,16 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var,
4343
void* buf = buffer.get();
4444

4545
void* payload = nullptr;
46-
size_t payload_size;
46+
size_t payload_size = 0;
4747
ProtoEncodeHelper e(static_cast<char*>(buf), 1024);
4848
e.WriteString(VarMsg::kVarnameFieldNumber, name);
4949
if (var->IsType<framework::LoDTensor>()) {
5050
e.WriteUint64(VarMsg::kTypeFieldNumber, 0);
5151
} else if (var->IsType<framework::SelectedRows>()) {
5252
e.WriteUint64(VarMsg::kTypeFieldNumber, 1);
53+
} else if (var->IsType<ncclUniqueId>()) {
54+
// NOTE: sendrecv only support RAW type for NCCL_ID
55+
e.WriteUint64(VarMsg::kTypeFieldNumber, 2);
5356
}
5457

5558
if (!out_name.empty()) {
@@ -139,11 +142,27 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var,
139142
payload_size = tensor->numel() * framework::SizeOfType(tensor->type());
140143
e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber, payload_size);
141144
} break;
145+
case framework::proto::VarType_Type_RAW: {
146+
e.WriteVarlengthBeginning(VarMsg::kSerializedFieldNumber,
147+
NCCL_UNIQUE_ID_BYTES);
148+
ncclUniqueId* uid = var->GetMutable<ncclUniqueId>();
149+
e.WriteRawBytes(std::string(uid->internal, NCCL_UNIQUE_ID_BYTES));
150+
} break;
142151
default:
143152
PADDLE_THROW("Serialize does not support type: %s",
144153
typeid(var->Type()).name());
145154
break;
146155
}
156+
157+
if (framework::ToVarType(var->Type()) == framework::proto::VarType_Type_RAW) {
158+
// for serialize NCCL_ID
159+
::grpc::Slice slices(e.size());
160+
memcpy(const_cast<uint8_t*>(slices.begin()), e.data(), e.size());
161+
::grpc::ByteBuffer tmp(&slices, 1);
162+
msg->Swap(&tmp);
163+
return;
164+
}
165+
147166
// steal reference of tensor data
148167
::grpc::Slice slices[4]; // metadata, tensor, rows meta, rows
149168
int num_slices = 2; // only SelectedRows have rows buffer

paddle/fluid/operators/detail/variable_response.cc

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,9 +367,18 @@ int VariableResponse::Parse(Source* source) {
367367
}
368368
case sendrecv::VariableMessage::kSerializedFieldNumber: {
369369
PADDLE_ENFORCE((meta_.type() == sendrecv::SELECTED_ROWS ||
370-
meta_.type() == sendrecv::LOD_TENSOR) &&
370+
meta_.type() == sendrecv::LOD_TENSOR ||
371+
meta_.type() == sendrecv::NCCL_ID) &&
371372
meta_.varname() != "",
372373
"meta info should be got first!");
374+
if (meta_.type() == sendrecv::NCCL_ID) {
375+
auto* var = scope_->FindVar(meta_.varname());
376+
if (var != nullptr) {
377+
ncclUniqueId* id = var->GetMutable<ncclUniqueId>();
378+
memcpy(id->internal, meta_.serialized().c_str(),
379+
meta_.serialized().size());
380+
}
381+
}
373382

374383
int length = 0;
375384
if (wt != WIRETYPE_LENGTH_DELIMITED ||

paddle/fluid/operators/gen_nccl_id_op.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class GenNCCLIdOp : public framework::OperatorBase {
5454
auto var = scope->FindVar("NCCLID");
5555
PADDLE_ENFORCE_NOT_NULL(var);
5656
auto id = var->GetMutable<ncclUniqueId>();
57-
ncclGetUniqueId(id);
57+
platform::dynload::ncclGetUniqueId(id);
5858

5959
std::vector<std::string> endpoint_list =
6060
Attr<std::vector<std::string>>("endpoint_list");
@@ -120,4 +120,4 @@ For trainer 1~n: start a gRPC server to get the UniqueId, once got, stop the ser
120120

121121
namespace ops = paddle::operators;
122122

123-
REGISTER_OPERATOR(gen_nccl_id_op, ops::GenNCCLIdOp, ops::GenNCCLIdOpMaker);
123+
REGISTER_OPERATOR(gen_nccl_id, ops::GenNCCLIdOp, ops::GenNCCLIdOpMaker);

paddle/fluid/platform/nccl_helper.h

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,9 @@ struct NCCLContextMap {
7373
std::unordered_map<int, NCCLContext> contexts_;
7474
std::vector<int> order_;
7575

76-
explicit NCCLContextMap(const std::vector<platform::Place> &places) {
76+
explicit NCCLContextMap(const std::vector<platform::Place> &places,
77+
ncclUniqueId *nccl_id = nullptr,
78+
size_t node_count = 0, size_t trainer_id = 0) {
7779
PADDLE_ENFORCE(!places.empty());
7880
order_.reserve(places.size());
7981
for (auto &p : places) {
@@ -85,18 +87,36 @@ struct NCCLContextMap {
8587
order_.size(), contexts_.size(),
8688
"NCCL Context Map does not support contain two or more same device");
8789

88-
if (places.size() > 1) {
89-
std::unique_ptr<ncclComm_t[]> comms(new ncclComm_t[order_.size()]);
90+
if (places.size() <= 1) {
91+
return;
92+
}
93+
std::unique_ptr<ncclComm_t[]> comms(new ncclComm_t[order_.size()]);
94+
// if pass nccl_id here, can assume we are doing multi node training
95+
if (nccl_id == nullptr) {
9096
{
9197
std::lock_guard<std::mutex> guard(NCCLGroupGuard::NCCLMutex());
9298
PADDLE_ENFORCE(platform::dynload::ncclCommInitAll(
9399
comms.get(), static_cast<int>(order_.size()), order_.data()));
94100
}
95-
int i = 0;
96-
for (auto &dev_id : order_) {
97-
contexts_.at(dev_id).comm_ = comms[i++];
101+
} else {
102+
PADDLE_ENFORCE_GT(node_count, 0);
103+
PADDLE_ENFORCE_EQ(node_count % places.size(), 0,
104+
"must have same number of GPUs on each node");
105+
{
106+
std::lock_guard<std::mutex> guard(NCCLGroupGuard::NCCLMutex());
107+
int nranks = node_count * order_.size();
108+
for (auto &gpu_id : order_) {
109+
int rank = trainer_id * order_.size() + gpu_id;
110+
PADDLE_ENFORCE(cudaSetDevice(gpu_id));
111+
PADDLE_ENFORCE(
112+
ncclCommInitRank(comms.get() + gpu_id, nranks, *nccl_id, rank));
113+
}
98114
}
99115
}
116+
int i = 0;
117+
for (auto &dev_id : order_) {
118+
contexts_.at(dev_id).comm_ = comms[i++];
119+
}
100120
}
101121

102122
NCCLContextMap(const NCCLContextMap &other) = delete;

paddle/fluid/pybind/pybind.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -502,11 +502,13 @@ All parameter, weight, gradient are variables in Paddle.
502502
const std::unordered_set<std::string> &bcast_vars,
503503
const ProgramDesc &main_program, const std::string &loss_var_name,
504504
Scope *scope, std::vector<Scope *> &local_scopes,
505-
bool allow_op_delay, bool use_default_grad_scale) {
505+
bool allow_op_delay, bool use_default_grad_scale,
506+
size_t num_trainers, size_t trainer_id) {
506507
new (&self) ParallelExecutor(
507508
num_threads, use_event, places, params, bcast_vars,
508509
main_program, loss_var_name, scope, local_scopes,
509-
allow_op_delay, use_default_grad_scale);
510+
allow_op_delay, use_default_grad_scale, num_trainers,
511+
trainer_id);
510512
})
511513
.def("bcast_params", &ParallelExecutor::BCastParamsToGPUs)
512514
// NOTE: even we return a vec<Scope*>* to Python use reference policy.

0 commit comments

Comments
 (0)