Skip to content

Commit 8f066ab

Browse files
authored
Merge pull request #960 from lonvia/make-copy-queue-bounded
limit the number of buffers that are in the copy queue
2 parents 8965e65 + 2c71a2b commit 8f066ab

File tree

2 files changed

+23
-5
lines changed

2 files changed

+23
-5
lines changed

db-copy.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ db_copy_thread_t::~db_copy_thread_t() { finish(); }
2828
void db_copy_thread_t::add_buffer(std::unique_ptr<db_cmd_t> &&buffer)
2929
{
3030
assert(m_worker.joinable()); // thread must not have been finished
31+
3132
std::unique_lock<std::mutex> lock(m_queue_mutex);
33+
m_queue_full_cond.wait(lock,
34+
[&]{ return m_worker_queue.size() < db_cmd_copy_t::Max_buffers; });
35+
3236
m_worker_queue.push_back(std::move(buffer));
3337
m_queue_cond.notify_one();
3438
}
@@ -60,13 +64,11 @@ void db_copy_thread_t::worker_thread()
6064
std::unique_ptr<db_cmd_t> item;
6165
{
6266
std::unique_lock<std::mutex> lock(m_queue_mutex);
63-
if (m_worker_queue.empty()) {
64-
m_queue_cond.wait(lock);
65-
continue;
66-
}
67+
m_queue_cond.wait(lock, [&]{ return !m_worker_queue.empty(); });
6768

6869
item = std::move(m_worker_queue.front());
6970
m_worker_queue.pop_front();
71+
m_queue_full_cond.notify_one();
7072
}
7173

7274
switch (item->type) {

db-copy.hpp

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,22 @@ class db_cmd_t
6767

6868
struct db_cmd_copy_t : public db_cmd_t
6969
{
70-
enum { Max_buf_size = 10 * 1024 * 1024 };
70+
enum {
71+
/** Size of a single buffer with COPY data for Postgresql.
72+
* This is a trade-off between memory usage and sending large chunks
73+
* to speed up processing. Currently a one-size fits all value.
74+
* Needs more testing and individual values per queue.
75+
*/
76+
Max_buf_size = 10 * 1024 * 1024,
77+
/** Maximum length of the queue with COPY data.
78+
* In the usual case, PostgreSQL should be faster processing the
79+
* data than it can be produced and there should only be one element
80+
* in the queue. If PostgreSQL is slower, then the queue will always
81+
* be full and it is better to keep the queue smaller to reduce memory
82+
* usage. Current value is just assumed to be a reasonable trade off.
83+
*/
84+
Max_buffers = 10
85+
};
7186
/// Name of the target table for the copy operation
7287
std::shared_ptr<db_target_descr_t> target;
7388
/// Vector with object to delete before copying
@@ -141,6 +156,7 @@ class db_copy_thread_t
141156
std::thread m_worker;
142157
std::mutex m_queue_mutex;
143158
std::condition_variable m_queue_cond;
159+
std::condition_variable m_queue_full_cond;
144160
std::deque<std::unique_ptr<db_cmd_t>> m_worker_queue;
145161

146162
// Target for copy operation currently ongoing.

0 commit comments

Comments
 (0)