File tree Expand file tree Collapse file tree 4 files changed +28
-0
lines changed
paddle/fluid/operators/distributed Expand file tree Collapse file tree 4 files changed +28
-0
lines changed Original file line number Diff line number Diff line change @@ -22,6 +22,8 @@ limitations under the License. */
22
22
#include " paddle/fluid/operators/distributed/request_handler.h"
23
23
#include " paddle/fluid/platform/profiler.h"
24
24
25
+ DECLARE_bool (rpc_disable_reuse_port);
26
+
25
27
namespace paddle {
26
28
namespace operators {
27
29
namespace distributed {
@@ -383,6 +385,9 @@ std::shared_ptr<grpc::Channel> GRPCClient::GetChannel(const std::string& ep) {
383
385
// Channel configurations:
384
386
grpc::ChannelArguments args;
385
387
args.SetInt (GRPC_ARG_MAX_RECONNECT_BACKOFF_MS, 2000 );
388
+ if (FLAGS_rpc_disable_reuse_port) {
389
+ args.SetInt (GRPC_ARG_ALLOW_REUSEPORT, 0 );
390
+ }
386
391
args.SetCompressionAlgorithm (GRPC_COMPRESS_NONE);
387
392
args.SetMaxSendMessageSize (std::numeric_limits<int >::max ());
388
393
args.SetMaxReceiveMessageSize (std::numeric_limits<int >::max ());
Original file line number Diff line number Diff line change @@ -20,6 +20,8 @@ limitations under the License. */
20
20
21
21
using ::grpc::ServerAsyncResponseWriter;
22
22
23
+ DECLARE_bool (rpc_disable_reuse_port);
24
+
23
25
namespace paddle {
24
26
namespace operators {
25
27
namespace distributed {
@@ -252,13 +254,31 @@ void AsyncGRPCServer::WaitServerReady() {
252
254
VLOG (40 ) << " AsyncGRPCServer WaitSeverReady" ;
253
255
}
254
256
257
+ // Define an option subclass in order to disable SO_REUSEPORT for the
258
+ // server socket.
259
+ // Come from:
260
+ // https://github.com/tensorflow/tensorflow/blob/master/tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc
261
+ class NoReusePortOption : public ::grpc::ServerBuilderOption {
262
+ public:
263
+ void UpdateArguments (::grpc::ChannelArguments* args) override {
264
+ args->SetInt (GRPC_ARG_ALLOW_REUSEPORT, 0 );
265
+ }
266
+
267
+ void UpdatePlugins (std::vector<std::unique_ptr<::grpc::ServerBuilderPlugin>>*
268
+ plugins) override {}
269
+ };
270
+
255
271
void AsyncGRPCServer::StartServer () {
256
272
::grpc::ServerBuilder builder;
257
273
builder.AddListeningPort (bind_address_, ::grpc::InsecureServerCredentials (),
258
274
&selected_port_);
259
275
260
276
builder.SetMaxSendMessageSize (std::numeric_limits<int >::max ());
261
277
builder.SetMaxReceiveMessageSize (std::numeric_limits<int >::max ());
278
+ if (FLAGS_rpc_disable_reuse_port) {
279
+ builder.SetOption (
280
+ std::unique_ptr<::grpc::ServerBuilderOption>(new NoReusePortOption));
281
+ }
262
282
builder.RegisterService (&service_);
263
283
264
284
for (auto t : rpc_call_map_) {
Original file line number Diff line number Diff line change @@ -22,6 +22,8 @@ limitations under the License. */
22
22
#include " paddle/fluid/operators/distributed/sendrecvop_utils.h"
23
23
#include " paddle/fluid/operators/distributed/variable_response.h"
24
24
25
+ DEFINE_bool (rpc_disable_reuse_port, false , " Disable SO_REUSEPORT or not." );
26
+
25
27
namespace paddle {
26
28
namespace operators {
27
29
namespace distributed {
Original file line number Diff line number Diff line change @@ -129,6 +129,7 @@ def __bootstrap__():
129
129
read_env_flags .append ('rpc_send_thread_num' )
130
130
read_env_flags .append ('rpc_get_thread_num' )
131
131
read_env_flags .append ('rpc_prefetch_thread_num' )
132
+ read_env_flags .append ('rpc_disable_reuse_port' )
132
133
133
134
if core .is_compiled_with_cuda ():
134
135
read_env_flags += [
You can’t perform that action at this time.
0 commit comments