Skip to content

Commit 251e4a8

Browse files
committed
unify fluid blocking queue
1 parent 6c0356e commit 251e4a8

File tree

6 files changed

+81
-98
lines changed

6 files changed

+81
-98
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
2+
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License. */
14+
15+
#pragma once
16+
17+
#include <condition_variable> // NOLINT
18+
#include <deque>
19+
#include <mutex> // NOLINT
20+
#include <utility>
21+
22+
namespace paddle {
23+
namespace framework {
24+
25+
template <typename T>
26+
class BlockingQueue {
27+
public:
28+
void Push(const T &item) {
29+
{
30+
std::lock_guard<std::mutex> g(mutex_);
31+
q_.emplace_back(item);
32+
}
33+
cv_.notify_one();
34+
}
35+
36+
template <typename U>
37+
void Extend(const U &items) {
38+
{
39+
std::lock_guard<std::mutex> g(mutex_);
40+
for (auto &item : items) {
41+
q_.emplace_back(item);
42+
}
43+
}
44+
cv_.notify_all();
45+
}
46+
47+
std::deque<T> PopAll(size_t ms, bool *timeout) {
48+
auto time =
49+
std::chrono::system_clock::now() + std::chrono::milliseconds(ms);
50+
std::unique_lock<std::mutex> lock(mutex_);
51+
*timeout = !cv_.wait_until(lock, time, [this] { return !q_.empty(); });
52+
std::deque<T> ret;
53+
if (!*timeout) {
54+
std::swap(ret, q_);
55+
}
56+
return ret;
57+
}
58+
59+
T Pop() {
60+
std::unique_lock<std::mutex> lock(mutex_);
61+
cv_.wait(lock, [=] { return !q_.empty(); });
62+
T rc(std::move(q_.front()));
63+
q_.pop_front();
64+
return rc;
65+
}
66+
67+
private:
68+
std::mutex mutex_;
69+
std::condition_variable cv_;
70+
std::deque<T> q_;
71+
};
72+
73+
} // namespace framework
74+
} // namespace paddle

paddle/fluid/framework/details/threaded_ssa_graph_executor.h

Lines changed: 1 addition & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include <functional>
2424
#include "ThreadPool.h" // ThreadPool in thrird party
25+
#include "paddle/fluid/framework/blocking_queue.h"
2526
#include "paddle/fluid/framework/details/ssa_graph_executor.h"
2627

2728
namespace paddle {
@@ -30,46 +31,6 @@ class Scope;
3031

3132
namespace details {
3233

33-
template <typename T>
34-
class BlockingQueue {
35-
public:
36-
void Push(const T &item) {
37-
{
38-
std::lock_guard<std::mutex> g(mutex_);
39-
q_.emplace_back(item);
40-
}
41-
cv_.notify_one();
42-
}
43-
44-
template <typename U>
45-
void Extend(const U &items) {
46-
{
47-
std::lock_guard<std::mutex> g(mutex_);
48-
for (auto &item : items) {
49-
q_.emplace_back(item);
50-
}
51-
}
52-
cv_.notify_all();
53-
}
54-
55-
std::deque<T> PopAll(size_t ms, bool *timeout) {
56-
auto time =
57-
std::chrono::system_clock::now() + std::chrono::milliseconds(ms);
58-
std::unique_lock<std::mutex> lock(mutex_);
59-
*timeout = !cv_.wait_until(lock, time, [this] { return !q_.empty(); });
60-
std::deque<T> ret;
61-
if (!*timeout) {
62-
std::swap(ret, q_);
63-
}
64-
return ret;
65-
}
66-
67-
private:
68-
std::mutex mutex_;
69-
std::condition_variable cv_;
70-
std::deque<T> q_;
71-
};
72-
7334
class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
7435
public:
7536
ThreadedSSAGraphExecutor(size_t num_threads, bool use_event,

paddle/fluid/operators/detail/grpc_client.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ limitations under the License. */
2929
#include "grpc++/support/byte_buffer.h"
3030
#include "grpc++/support/slice.h"
3131
#include "grpc/support/log.h"
32+
#include "paddle/fluid/framework/blocking_queue.h"
3233
#include "paddle/fluid/framework/data_type.h"
3334
#include "paddle/fluid/framework/lod_tensor.h"
3435
#include "paddle/fluid/framework/scope.h"
3536
#include "paddle/fluid/framework/selected_rows.h"
3637
#include "paddle/fluid/operators/detail/sendrecvop_utils.h"
37-
#include "paddle/fluid/operators/detail/simple_block_queue.h"
3838

3939
namespace paddle {
4040
namespace operators {

paddle/fluid/operators/detail/grpc_server.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ class RequestGet final : public RequestBase {
9090
::grpc::ServerCompletionQueue* cq,
9191
framework::Scope* scope,
9292
const platform::DeviceContext* dev_ctx,
93-
SimpleBlockQueue<MessageWithName>* queue)
93+
framework::BlockingQueue<MessageWithName>* queue)
9494
: RequestBase(service, cq, dev_ctx),
9595
responder_(&ctx_),
9696
scope_(scope),
@@ -128,7 +128,7 @@ class RequestGet final : public RequestBase {
128128
sendrecv::VariableMessage request_;
129129
ServerAsyncResponseWriter<::grpc::ByteBuffer> responder_;
130130
framework::Scope* scope_;
131-
SimpleBlockQueue<MessageWithName>* queue_;
131+
framework::BlockingQueue<MessageWithName>* queue_;
132132
};
133133

134134
class RequestPrefetch final : public RequestBase {

paddle/fluid/operators/detail/grpc_server.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ limitations under the License. */
1919
#include <utility>
2020

2121
#include "grpc++/grpc++.h"
22+
#include "paddle/fluid/framework/blocking_queue.h"
2223
#include "paddle/fluid/framework/executor.h"
2324
#include "paddle/fluid/framework/lod_tensor.h"
2425
#include "paddle/fluid/framework/program_desc.h"
@@ -29,15 +30,14 @@ limitations under the License. */
2930
#include "paddle/fluid/operators/detail/send_recv.grpc.pb.h"
3031
#include "paddle/fluid/operators/detail/send_recv.pb.h"
3132
#include "paddle/fluid/operators/detail/sendrecvop_utils.h"
32-
#include "paddle/fluid/operators/detail/simple_block_queue.h"
3333

3434
namespace paddle {
3535
namespace operators {
3636
namespace detail {
3737

3838
typedef std::pair<std::string, std::shared_ptr<VariableResponse>>
3939
ReceivedMessage;
40-
typedef SimpleBlockQueue<ReceivedMessage> ReceivedQueue;
40+
typedef framework::BlockingQueue<ReceivedMessage> ReceivedQueue;
4141

4242
typedef std::pair<std::string, sendrecv::VariableMessage> MessageWithName;
4343
class RequestBase;
@@ -99,7 +99,7 @@ class AsyncGRPCServer final {
9999
const platform::DeviceContext *dev_ctx_;
100100

101101
// received variable from RPC, operators fetch variable from this queue.
102-
SimpleBlockQueue<MessageWithName> var_get_queue_;
102+
framework::BlockingQueue<MessageWithName> var_get_queue_;
103103
// client send variable to this queue.
104104
ReceivedQueue var_recv_queue_;
105105

paddle/fluid/operators/detail/simple_block_queue.h

Lines changed: 0 additions & 52 deletions
This file was deleted.

0 commit comments

Comments
 (0)