Skip to content

Commit 17009d0

Browse files
committed
workable version
1 parent a529d79 commit 17009d0

File tree

4 files changed

+98
-3
lines changed

4 files changed

+98
-3
lines changed

paddle/fluid/operators/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ if(WITH_DISTRIBUTE)
205205
set_source_files_properties(send_barrier_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
206206
set_source_files_properties(send_recv_op_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
207207
cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS prefetch_op send_op listen_and_serv_op sum_op executor)
208+
cc_test(test_send_nccl_id SRCS test_send_nccl_id.cc DEPS send_op listen_and_serv_op executor)
208209
else()
209210
set(DEPS_OPS ${DEPS_OPS} send_op prefetch_op recv_op listen_and_serv_op send_vars_op send_barrier_op gen_nccl_id_op)
210211
endif()
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
2+
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License. */
14+
15+
#include <unistd.h>
16+
#include <string>
17+
#include <thread> // NOLINT
18+
19+
#include "gtest/gtest.h"
20+
#include "paddle/fluid/framework/op_registry.h"
21+
#include "paddle/fluid/framework/operator.h"
22+
#include "paddle/fluid/framework/program_desc.h"
23+
#include "paddle/fluid/operators/detail/grpc_client.h"
24+
#include "paddle/fluid/operators/listen_and_serv_op.h"
25+
#include "paddle/fluid/operators/math/math_function.h"
26+
#include "paddle/fluid/operators/math/selected_rows_functor.h"
27+
#include "paddle/fluid/string/printf.h"
28+
29+
USE_NO_KERNEL_OP(listen_and_serv);
30+
31+
namespace f = paddle::framework;
32+
namespace p = paddle::platform;
33+
namespace m = paddle::operators::math;
34+
namespace detail = paddle::operators::detail;
35+
namespace string = paddle::string;
36+
37+
std::unique_ptr<detail::AsyncGRPCServer> rpc_service;
38+
39+
void StartServer() {
40+
f::Scope scope;
41+
p::CPUPlace place;
42+
scope.Var("NCCLID");
43+
p::DeviceContextPool& pool = p::DeviceContextPool::Instance();
44+
auto& dev_ctx = *pool.Get(p::CPUPlace());
45+
46+
rpc_service.reset(new detail::AsyncGRPCServer("127.0.0.1:0", true));
47+
48+
f::ProgramDesc empty_program;
49+
f::Executor executor(dev_ctx.GetPlace());
50+
rpc_service->SetScope(&scope);
51+
rpc_service->SetDevCtx(&dev_ctx);
52+
rpc_service->SetProgram(&empty_program);
53+
rpc_service->SetExecutor(&executor);
54+
55+
std::thread server_thread(
56+
std::bind(&detail::AsyncGRPCServer::RunSyncUpdate, rpc_service.get()));
57+
rpc_service->SetCond(0);
58+
auto recv = rpc_service->Get();
59+
LOG(INFO) << "got nccl id and stop server...";
60+
rpc_service->ShutDown();
61+
server_thread.join();
62+
}
63+
64+
TEST(SendNcclId, Normal) {
65+
std::thread server_thread(StartServer);
66+
// wait server to start
67+
sleep(2);
68+
69+
f::Scope scope;
70+
p::CPUPlace place;
71+
p::DeviceContextPool& pool = p::DeviceContextPool::Instance();
72+
auto& dev_ctx = *pool.Get(p::CPUPlace());
73+
74+
auto var = scope.Var("NCCLID");
75+
// var->SetType(f::proto::VarType_Type_RAW);
76+
auto id = var->GetMutable<ncclUniqueId>();
77+
p::dynload::ncclGetUniqueId(id);
78+
79+
int port = rpc_service->GetSelectedPort();
80+
std::string ep = string::Sprintf("127.0.0.1:%d", port);
81+
detail::RPCClient client;
82+
83+
client.AsyncSendVariable(ep, dev_ctx, scope, "NCCLID");
84+
client.Wait();
85+
server_thread.join();
86+
auto* ptr = rpc_service.release();
87+
delete ptr;
88+
}

paddle/fluid/platform/nccl_helper.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#pragma once
1616

17+
#include <stdio.h>
1718
#include <thread> // NOLINT
1819
#include <typeindex>
1920
#include <vector>
@@ -100,13 +101,13 @@ struct NCCLContextMap {
100101
}
101102
} else {
102103
PADDLE_ENFORCE_GT(node_count, 0);
103-
PADDLE_ENFORCE_EQ(node_count % places.size(), 0,
104-
"must have same number of GPUs on each node");
104+
// TODO(wuyi): need to ensure each node have same number of GPUs
105105
{
106-
std::lock_guard<std::mutex> guard(NCCLGroupGuard::NCCLMutex());
107106
int nranks = node_count * order_.size();
107+
NCCLGroupGuard gurad;
108108
for (auto &gpu_id : order_) {
109109
int rank = trainer_id * order_.size() + gpu_id;
110+
VLOG(3) << "init nccl rank: " << rank << " nranks: " << nranks;
110111
PADDLE_ENFORCE(cudaSetDevice(gpu_id));
111112
PADDLE_ENFORCE(platform::dynload::ncclCommInitRank(
112113
comms.get() + gpu_id, nranks, *nccl_id, rank));

python/paddle/fluid/parallel_executor.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ def __init__(self,
5353
gradients of each device and scaled gradients would be
5454
aggregated. Otherwise, a customized scale value should be fed
5555
to the network.
56+
num_nodes(int, default 0): If greater than 0, NCCL will be
57+
initialized with multpile rank of nodes, each node should have
58+
same number of GPUs. Distributed training will be enabled then.
59+
trainer_id(int, default 0): Must use together with num_nodes.
60+
trainer_id is the "rank" of current node starts from 0.
5661
5762
Returns:
5863
A ParallelExecutor object.

0 commit comments

Comments
 (0)