Skip to content

Commit de71d16

Browse files
committed
initial implementation of buffer filling functions in db-copy
1 parent 8645d2f commit de71d16

21 files changed

+308
-205
lines changed

db-copy.cpp

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,32 @@ db_copy_thread_t::db_copy_thread_t(std::string const &conninfo)
2323
});
2424
}
2525

26+
db_copy_thread_t::~db_copy_thread_t() { finish(); }
27+
2628
void db_copy_thread_t::add_buffer(std::unique_ptr<db_cmd_t> &&buffer)
2729
{
30+
assert(m_worker.joinable()); // thread must not have been finished
2831
std::unique_lock<std::mutex> lock(m_queue_mutex);
2932
m_worker_queue.push_back(std::move(buffer));
33+
m_queue_cond.notify_one();
34+
}
35+
36+
void db_copy_thread_t::sync_and_wait()
37+
{
38+
std::promise<void> barrier;
39+
std::future<void> sync = barrier.get_future();
40+
add_buffer(std::unique_ptr<db_cmd_t>(new db_cmd_sync_t(std::move(barrier))));
41+
sync.wait();
3042
}
3143

3244
void db_copy_thread_t::finish()
3345
{
34-
add_buffer(std::unique_ptr<db_cmd_t>(new db_cmd_finish_t()));
35-
m_worker.join();
46+
if (m_worker.joinable()) {
47+
finish_copy();
48+
49+
add_buffer(std::unique_ptr<db_cmd_t>(new db_cmd_finish_t()));
50+
m_worker.join();
51+
}
3652
}
3753

3854
void db_copy_thread_t::worker_thread()
@@ -61,6 +77,7 @@ void db_copy_thread_t::worker_thread()
6177
execute_sql(static_cast<db_cmd_sql_t *>(item.get())->buffer);
6278
break;
6379
case db_cmd_t::Cmd_sync:
80+
finish_copy();
6481
static_cast<db_cmd_sync_t *>(item.get())->barrier.set_value();
6582
break;
6683
case db_cmd_t::Cmd_finish:
@@ -69,8 +86,7 @@ void db_copy_thread_t::worker_thread()
6986
}
7087
}
7188

72-
if (m_inflight)
73-
finish_copy();
89+
finish_copy();
7490

7591
disconnect();
7692
}
@@ -93,8 +109,7 @@ void db_copy_thread_t::connect()
93109

94110
void db_copy_thread_t::execute_sql(std::string const &sql_cmd)
95111
{
96-
if (m_inflight)
97-
finish_copy();
112+
finish_copy();
98113

99114
pgsql_exec_simple(m_conn, PGRES_COMMAND_OK, sql_cmd.c_str());
100115
}
@@ -117,7 +132,8 @@ void db_copy_thread_t::write_to_db(db_cmd_copy_t *buffer)
117132
if (!buffer->deletables.empty())
118133
delete_rows(buffer);
119134

120-
start_copy(buffer->target);
135+
if (!m_inflight)
136+
start_copy(buffer->target);
121137

122138
pgsql_CopyData(buffer->target->name.c_str(), m_conn, buffer->buffer);
123139
}
@@ -130,24 +146,21 @@ void db_copy_thread_t::delete_rows(db_cmd_copy_t *buffer)
130146
sql.reserve(buffer->target->name.size() + buffer->deletables.size() * 15 +
131147
30);
132148
sql += buffer->target->name;
133-
sql += "WHERE ";
149+
sql += " WHERE ";
134150
sql += buffer->target->id;
135151
sql += " IN (";
136152
for (auto id : buffer->deletables) {
137153
sql += std::to_string(id);
138154
sql += ',';
139155
}
140-
sql += ')';
156+
sql[sql.size() - 1] = ')';
141157

142158
pgsql_exec_simple(m_conn, PGRES_COMMAND_OK, sql);
143159
}
144160

145161
void db_copy_thread_t::start_copy(std::shared_ptr<db_target_descr_t> const &target)
146162
{
147-
if (m_inflight)
148-
return;
149-
150-
assert(m_inflight.get() == target.get());
163+
m_inflight = target;
151164

152165
std::string copystr = "COPY ";
153166
copystr.reserve(target->name.size() + target->rows.size() + 14);
@@ -165,6 +178,9 @@ void db_copy_thread_t::start_copy(std::shared_ptr<db_target_descr_t> const &targ
165178

166179
void db_copy_thread_t::finish_copy()
167180
{
181+
if (!m_inflight)
182+
return;
183+
168184
if (PQputCopyEnd(m_conn, nullptr) != 1)
169185
throw std::runtime_error((fmt("stop COPY_END for %1% failed: %2%\n") %
170186
m_inflight->name %
@@ -215,10 +231,12 @@ void db_copy_mgr_t::exec_sql(std::string const &sql_cmd)
215231

216232
void db_copy_mgr_t::sync()
217233
{
218-
std::promise<void> barrier;
219-
std::future<void> sync = barrier.get_future();
220-
m_processor->add_buffer(std::unique_ptr<db_cmd_t>(new db_cmd_sync_t(std::move(barrier))));
221-
sync.wait();
234+
// finish any ongoing copy operations
235+
if (m_current) {
236+
m_processor->add_buffer(std::move(m_current));
237+
}
238+
239+
m_processor->sync_and_wait();
222240
}
223241

224242
void db_copy_mgr_t::finish_line()

db-copy.hpp

Lines changed: 112 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include <condition_variable>
55
#include <deque>
6+
#include <future>
67
#include <memory>
78
#include <mutex>
89
#include <string>
@@ -108,12 +109,18 @@ class db_copy_thread_t
108109
{
109110
public:
110111
db_copy_thread_t(std::string const &conninfo);
112+
~db_copy_thread_t();
111113

112114
/**
113115
* Add another command for the worker.
114116
*/
115117
void add_buffer(std::unique_ptr<db_cmd_t> &&buffer);
116118

119+
/**
120+
* Send sync command and wait for the notification.
121+
*/
122+
void sync_and_wait();
123+
117124
/**
118125
* Finish the copy process.
119126
*
@@ -166,21 +173,82 @@ class db_copy_mgr_t
166173
*/
167174
void finish_line();
168175

169-
/**
170-
* Add a column entry of simple type.
171-
*/
176+
template <typename T, typename ...ARGS>
177+
void add_columns(T value, ARGS&&... args)
178+
{
179+
add_column(value);
180+
add_columns(args...);
181+
}
182+
183+
template <typename T>
184+
void add_columns(T value)
185+
{
186+
add_column(value);
187+
}
188+
189+
/// Add a column entry of simple type.
172190
template <typename T>
173191
void add_column(T value)
174192
{
175-
auto s = std::to_string(value);
176-
add_column<>(s);
193+
add_value(value);
194+
m_current->buffer += '\t';
195+
}
196+
197+
/// Add an empty column.
198+
void add_null_column() { m_current->buffer += "\\N\t"; }
199+
200+
/// Start an array column.
201+
void new_array() { m_current->buffer += "{"; }
202+
203+
/// Add a single value to an array column
204+
template <typename T>
205+
void add_array_elem(T value)
206+
{
207+
add_value(value);
208+
m_current->buffer += ',';
177209
}
178210

179-
template <>
180-
void add_column(std::string const &s)
211+
void add_array_elem(std::string const &s) { add_array_elem(s.c_str()); }
212+
213+
void add_array_elem(char const *s)
181214
{
182215
assert(m_current);
183-
m_current->buffer += s;
216+
m_current->buffer += '"';
217+
for (char const *c = s; *c; ++c) {
218+
switch (*c) {
219+
case '"':
220+
m_current->buffer += "\\\\\"";
221+
break;
222+
case '\\':
223+
m_current->buffer += "\\\\\\\\";
224+
break;
225+
case '\n':
226+
m_current->buffer += "\\n";
227+
break;
228+
case '\r':
229+
m_current->buffer += "\\r";
230+
break;
231+
case '\t':
232+
m_current->buffer += "\\t";
233+
break;
234+
default:
235+
m_current->buffer += *c;
236+
break;
237+
}
238+
}
239+
240+
m_current->buffer += "\",";
241+
}
242+
243+
/// Finish an array column.
244+
void finish_array()
245+
{
246+
auto idx = m_current->buffer.size() - 1;
247+
if (m_current->buffer[idx] == '{')
248+
m_current->buffer += '}';
249+
else
250+
m_current->buffer[idx] = '}';
251+
m_current->buffer += '\t';
184252
}
185253

186254
/**
@@ -207,6 +275,42 @@ class db_copy_mgr_t
207275
void sync();
208276

209277
private:
278+
template <typename T>
279+
void add_value(T value)
280+
{
281+
assert(m_current);
282+
m_current->buffer += std::to_string(value);
283+
}
284+
285+
void add_value(std::string const &s) { add_value(s.c_str()); }
286+
287+
void add_value(char const *s)
288+
{
289+
assert(m_current);
290+
for (char const *c = s; *c; ++c) {
291+
switch (*c) {
292+
case '"':
293+
m_current->buffer += "\\\"";
294+
break;
295+
case '\\':
296+
m_current->buffer += "\\\\";
297+
break;
298+
case '\n':
299+
m_current->buffer += "\\n";
300+
break;
301+
case '\r':
302+
m_current->buffer += "\\r";
303+
break;
304+
case '\t':
305+
m_current->buffer += "\\t";
306+
break;
307+
default:
308+
m_current->buffer += *c;
309+
break;
310+
}
311+
}
312+
}
313+
210314
std::shared_ptr<db_copy_thread_t> m_processor;
211315
std::unique_ptr<db_cmd_copy_t> m_current;
212316
unsigned m_last_line;

0 commit comments

Comments
 (0)