Skip to content

Commit 42a15a4

Browse files
committed
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into add-async-listen-and-serv-op
2 parents 63055a3 + 6c0356e commit 42a15a4

File tree

6 files changed

+27
-29
lines changed

6 files changed

+27
-29
lines changed

paddle/fluid/framework/details/threaded_ssa_graph_executor.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
128128
//
129129
// NOTE: DelayedOps have a lower priority. It will be scheduled after all
130130
// ready_ops have been performed.
131-
if (ready_ops.empty() && allow_op_delay_) {
131+
if (ready_ops.empty() && allow_op_delay_ && running_ops_ == 0) {
132132
run_all_ops(delayed_ops);
133133
} else {
134134
run_all_ops(ready_ops);

paddle/fluid/operators/detail/grpc_server.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ class RequestSend final : public RequestBase {
6868
queue_(queue),
6969
responder_(&ctx_) {
7070
if (sync_mode_) {
71-
request_.reset(new VariableResponse(false, scope, dev_ctx_));
71+
request_.reset(new VariableResponse(scope, dev_ctx_, false));
7272
} else {
73-
request_.reset(new VariableResponse(true, scope, dev_ctx_));
73+
request_.reset(new VariableResponse(scope, dev_ctx_, true));
7474
}
7575
int method_id = static_cast<int>(detail::GrpcMethod::kSendVariable);
7676
service_->RequestAsyncUnary(method_id, &ctx_, request_.get(), &responder_,
@@ -158,9 +158,9 @@ class RequestPrefetch final : public RequestBase {
158158
program_(program),
159159
prefetch_ctx_(prefetch_ctx) {
160160
if (sync_mode_) {
161-
request_.reset(new VariableResponse(false, scope, dev_ctx_));
161+
request_.reset(new VariableResponse(scope, dev_ctx_, false));
162162
} else {
163-
request_.reset(new VariableResponse(true, scope, dev_ctx_));
163+
request_.reset(new VariableResponse(scope, dev_ctx_, true));
164164
}
165165
int method_id = static_cast<int>(detail::GrpcMethod::kPrefetchVariable);
166166
service_->RequestAsyncUnary(method_id, &ctx_, request_.get(), &responder_,

paddle/fluid/operators/detail/sendrecvop_utils.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg,
186186
const platform::DeviceContext& ctx,
187187
const framework::Scope* scope,
188188
framework::Variable** var) {
189-
operators::detail::VariableResponse resp(false, scope, &ctx);
189+
operators::detail::VariableResponse resp(scope, &ctx);
190190
PADDLE_ENFORCE(resp.Parse(msg) == 0, "parse bytebuffer to tensor error!");
191191
*var = resp.GetVar();
192192
}

paddle/fluid/operators/detail/serde_test.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ void RunSerdeTestSelectedRows(platform::Place place) {
5151

5252
::grpc::ByteBuffer msg;
5353
operators::detail::SerializeToByteBuffer("myvar", &var, ctx, &msg);
54-
EXPECT_GT(msg.Length(), 0);
54+
EXPECT_GT(msg.Length(), static_cast<size_t>(0));
5555

5656
// deserialize
5757
std::vector<::grpc::Slice> slices;
@@ -84,7 +84,7 @@ void RunSerdeTestSelectedRows(platform::Place place) {
8484
// operators::detail::DeserializeFromByteBuffer(msg, ctx, &var2);
8585
framework::Scope scope;
8686
scope.Var("myvar");
87-
operators::detail::VariableResponse resp(false, &scope, &ctx);
87+
operators::detail::VariableResponse resp(&scope, &ctx);
8888
EXPECT_EQ(resp.Parse(msg), 0);
8989

9090
framework::Variable* var2 = resp.GetVar();
@@ -129,7 +129,7 @@ void RunTestLodTensor(platform::Place place, int from_type = 0) {
129129

130130
::grpc::ByteBuffer msg;
131131
operators::detail::SerializeToByteBuffer("myvar", &var, ctx, &msg);
132-
EXPECT_GT(msg.Length(), 0);
132+
EXPECT_GT(msg.Length(), static_cast<size_t>(0));
133133

134134
// deserialize
135135
std::vector<::grpc::Slice> slices;
@@ -171,7 +171,7 @@ void RunTestLodTensor(platform::Place place, int from_type = 0) {
171171
// deserialize zero-copy
172172
framework::Scope scope;
173173
scope.Var("myvar");
174-
operators::detail::VariableResponse resp(false, &scope, &ctx);
174+
operators::detail::VariableResponse resp(&scope, &ctx);
175175
if (from_type == 0) {
176176
EXPECT_EQ(resp.Parse(msg), 0);
177177
} else {

paddle/fluid/operators/detail/variable_response.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ bool VariableResponse::CopyLodTensorData(
114114
::google::protobuf::io::CodedInputStream* input,
115115
const platform::DeviceContext& ctx, const framework::DDim& dims,
116116
int length) {
117-
auto* tensor = InitVar()->GetMutable<framework::LoDTensor>();
117+
auto* tensor = GetVar()->GetMutable<framework::LoDTensor>();
118118
tensor->Resize(dims);
119119

120120
framework::LoD lod;
@@ -150,7 +150,7 @@ bool VariableResponse::CopySelectRowsTensorData(
150150
::google::protobuf::io::CodedInputStream* input,
151151
const platform::DeviceContext& ctx, const framework::DDim& dims,
152152
int length) {
153-
auto* slr = InitVar()->GetMutable<framework::SelectedRows>();
153+
auto* slr = GetVar()->GetMutable<framework::SelectedRows>();
154154
slr->set_height(meta_.slr_height());
155155
auto* tensor = slr->mutable_value();
156156
tensor->Resize(dims);
@@ -172,7 +172,7 @@ bool VariableResponse::CopySelectRowsTensorData(
172172
bool VariableResponse::CopySelectRowsData(
173173
::google::protobuf::io::CodedInputStream* input,
174174
const platform::DeviceContext& ctx, int length) {
175-
auto* slr = InitVar()->GetMutable<framework::SelectedRows>();
175+
auto* slr = GetVar()->GetMutable<framework::SelectedRows>();
176176
slr->mutable_rows()->resize(length /
177177
framework::SizeOfType(typeid(int64_t))); // int64
178178
int64_t* rows_data = slr->mutable_rows()->data();

paddle/fluid/operators/detail/variable_response.h

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,18 @@ namespace detail {
3636

3737
class VariableResponse {
3838
public:
39-
VariableResponse(bool use_local_scope, const framework::Scope* scope,
40-
const platform::DeviceContext* dev_ctx)
41-
: use_local_scope_(use_local_scope), scope_(scope), dev_ctx_(dev_ctx) {
42-
local_scope_ = &scope->NewScope();
39+
VariableResponse(const framework::Scope* scope,
40+
const platform::DeviceContext* dev_ctx,
41+
bool create_scope = false)
42+
: scope_(scope), dev_ctx_(dev_ctx), create_scope_(create_scope) {
43+
if (create_scope) {
44+
local_scope_ = &scope->NewScope();
45+
}
4346
}
4447

45-
virtual ~VariableResponse() { scope_->DeleteScope(local_scope_); }
48+
virtual ~VariableResponse() {
49+
if (create_scope_) scope_->DeleteScope(local_scope_);
50+
}
4651

4752
// return:
4853
// 0:ok.
@@ -63,17 +68,10 @@ class VariableResponse {
6368

6469
// should call parse first.
6570
framework::Variable* GetVar() {
66-
return local_scope_->FindVar(meta_.varname());
67-
}
68-
69-
framework::Variable* InitVar() {
70-
if (use_local_scope_) {
71-
bool has_var = (scope_->FindVar(meta_.varname()) != nullptr);
72-
PADDLE_ENFORCE(has_var);
71+
if (create_scope_) {
7372
return local_scope_->Var(meta_.varname());
74-
} else {
75-
return scope_->FindVar(meta_.varname());
7673
}
74+
return scope_->FindVar(meta_.varname());
7775
}
7876

7977
private:
@@ -89,10 +87,10 @@ class VariableResponse {
8987
const framework::DDim& dims, int length);
9088

9189
private:
92-
bool use_local_scope_ = false;
9390
const framework::Scope* scope_;
94-
framework::Scope* local_scope_ = nullptr;
9591
const platform::DeviceContext* dev_ctx_;
92+
bool create_scope_ = false;
93+
framework::Scope* local_scope_ = nullptr;
9694
// only Skeleton
9795
sendrecv::VariableMessage meta_;
9896
};

0 commit comments

Comments
 (0)