Skip to content

Commit 8645d2f

Browse files
committed
middle-pgsql: adapt table setup and teardown to db-copy use
1 parent fc33126 commit 8645d2f

File tree

5 files changed

+306
-466
lines changed

5 files changed

+306
-466
lines changed

db-copy.cpp

Lines changed: 75 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include <boost/format.hpp>
22
#include <cassert>
33
#include <cstdio>
4+
#include <future>
45
#include <thread>
56

67
#include "db-copy.hpp"
@@ -22,24 +23,25 @@ db_copy_thread_t::db_copy_thread_t(std::string const &conninfo)
2223
});
2324
}
2425

25-
void db_copy_thread_t::add_buffer(std::unique_ptr<db_copy_buffer_t> &&buffer)
26+
void db_copy_thread_t::add_buffer(std::unique_ptr<db_cmd_t> &&buffer)
2627
{
2728
std::unique_lock<std::mutex> lock(m_queue_mutex);
2829
m_worker_queue.push_back(std::move(buffer));
2930
}
3031

3132
void db_copy_thread_t::finish()
3233
{
33-
add_buffer(std::unique_ptr<db_copy_buffer_t>());
34+
add_buffer(std::unique_ptr<db_cmd_t>(new db_cmd_finish_t()));
3435
m_worker.join();
3536
}
3637

3738
void db_copy_thread_t::worker_thread()
3839
{
3940
connect();
4041

41-
for (;;) {
42-
std::unique_ptr<db_copy_buffer_t> item;
42+
bool done = false;
43+
while (!done) {
44+
std::unique_ptr<db_cmd_t> item;
4345
{
4446
std::unique_lock<std::mutex> lock(m_queue_mutex);
4547
if (m_worker_queue.empty()) {
@@ -51,19 +53,26 @@ void db_copy_thread_t::worker_thread()
5153
m_worker_queue.pop_front();
5254
}
5355

54-
if (!item)
55-
break;
56-
57-
if (item->is_copy_buffer())
58-
write_to_db(std::move(item));
59-
else
60-
execute_sql(item->buffer);
56+
switch (item->type) {
57+
case db_cmd_t::Cmd_copy:
58+
write_to_db(static_cast<db_cmd_copy_t *>(item.get()));
59+
break;
60+
case db_cmd_t::Cmd_sql:
61+
execute_sql(static_cast<db_cmd_sql_t *>(item.get())->buffer);
62+
break;
63+
case db_cmd_t::Cmd_sync:
64+
static_cast<db_cmd_sync_t *>(item.get())->barrier.set_value();
65+
break;
66+
case db_cmd_t::Cmd_finish:
67+
done = true;
68+
break;
69+
}
6170
}
6271

6372
if (m_inflight)
6473
finish_copy();
6574

66-
commit();
75+
disconnect();
6776
}
6877

6978
void db_copy_thread_t::connect()
@@ -80,8 +89,6 @@ void db_copy_thread_t::connect()
8089
// Let commits happen faster by delaying when they actually occur.
8190
pgsql_exec_simple(m_conn, PGRES_COMMAND_OK,
8291
"SET synchronous_commit TO off;");
83-
// Wrap everything into one huge transaction. XXX is that a good idea?
84-
pgsql_exec_simple(m_conn, PGRES_COMMAND_OK, "BEGIN");
8592
}
8693

8794
void db_copy_thread_t::execute_sql(std::string const &sql_cmd)
@@ -92,34 +99,30 @@ void db_copy_thread_t::execute_sql(std::string const &sql_cmd)
9299
pgsql_exec_simple(m_conn, PGRES_COMMAND_OK, sql_cmd.c_str());
93100
}
94101

95-
void db_copy_thread_t::commit()
102+
void db_copy_thread_t::disconnect()
96103
{
97104
if (!m_conn)
98105
return;
99106

100-
fprintf(stderr, "Committing transactions\n");
101-
pgsql_exec_simple(m_conn, PGRES_COMMAND_OK, "COMMIT");
102-
103107
PQfinish(m_conn);
104108
m_conn = nullptr;
105109
}
106110

107-
void db_copy_thread_t::write_to_db(std::unique_ptr<db_copy_buffer_t> &&buffer)
111+
void db_copy_thread_t::write_to_db(db_cmd_copy_t *buffer)
108112
{
109113
if (!buffer->deletables.empty() ||
110-
(m_inflight && !buffer->target->same_copy_target(*m_inflight->target)))
114+
(m_inflight && !buffer->target->same_copy_target(*m_inflight.get())))
111115
finish_copy();
112116

113117
if (!buffer->deletables.empty())
114-
delete_rows(buffer.get());
118+
delete_rows(buffer);
115119

116-
start_copy(std::move(buffer));
120+
start_copy(buffer->target);
117121

118-
pgsql_CopyData(m_inflight->target->name.c_str(), m_conn,
119-
m_inflight->buffer);
122+
pgsql_CopyData(buffer->target->name.c_str(), m_conn, buffer->buffer);
120123
}
121124

122-
void db_copy_thread_t::delete_rows(db_copy_buffer_t *buffer)
125+
void db_copy_thread_t::delete_rows(db_cmd_copy_t *buffer)
123126
{
124127
assert(!m_inflight);
125128

@@ -139,37 +142,39 @@ void db_copy_thread_t::delete_rows(db_copy_buffer_t *buffer)
139142
pgsql_exec_simple(m_conn, PGRES_COMMAND_OK, sql);
140143
}
141144

142-
void db_copy_thread_t::start_copy(std::unique_ptr<db_copy_buffer_t> &&buffer)
145+
void db_copy_thread_t::start_copy(std::shared_ptr<db_target_descr_t> const &target)
143146
{
144-
if (!m_inflight) {
145-
std::string copystr = "COPY ";
146-
copystr.reserve(buffer->target->name.size() +
147-
buffer->target->rows.size() + 14);
148-
copystr += buffer->target->name;
149-
if (!buffer->target->rows.empty()) {
150-
copystr += '(';
151-
copystr += buffer->target->rows;
152-
copystr += ')';
153-
}
154-
copystr += " FROM STDIN";
155-
pgsql_exec_simple(m_conn, PGRES_COPY_IN, copystr);
147+
if (m_inflight)
148+
return;
149+
150+
assert(m_inflight.get() == target.get());
151+
152+
std::string copystr = "COPY ";
153+
copystr.reserve(target->name.size() + target->rows.size() + 14);
154+
copystr += target->name;
155+
if (!target->rows.empty()) {
156+
copystr += '(';
157+
copystr += target->rows;
158+
copystr += ')';
156159
}
160+
copystr += " FROM STDIN";
161+
pgsql_exec_simple(m_conn, PGRES_COPY_IN, copystr);
157162

158-
m_inflight = std::move(buffer);
163+
m_inflight = target;
159164
}
160165

161166
void db_copy_thread_t::finish_copy()
162167
{
163168
if (PQputCopyEnd(m_conn, nullptr) != 1)
164169
throw std::runtime_error((fmt("stop COPY_END for %1% failed: %2%\n") %
165-
m_inflight->target->name %
170+
m_inflight->name %
166171
PQerrorMessage(m_conn))
167172
.str());
168173

169174
pg_result_t res(PQgetResult(m_conn));
170175
if (PQresultStatus(res.get()) != PGRES_COMMAND_OK)
171176
throw std::runtime_error((fmt("result COPY_END for %1% failed: %2%\n") %
172-
m_inflight->target->name %
177+
m_inflight->name %
173178
PQerrorMessage(m_conn))
174179
.str());
175180

@@ -187,7 +192,7 @@ void db_copy_mgr_t::new_line(std::shared_ptr<db_target_descr_t> const &table)
187192
m_processor->add_buffer(std::move(m_current));
188193
}
189194

190-
m_current.reset(new db_copy_buffer_t(table));
195+
m_current.reset(new db_cmd_copy_t(table));
191196
}
192197
}
193198

@@ -205,6 +210,32 @@ void db_copy_mgr_t::exec_sql(std::string const &sql_cmd)
205210
}
206211

207212
// and add SQL command
208-
m_current.reset(new db_copy_buffer_t(sql_cmd));
209-
m_processor->add_buffer(std::move(m_current));
213+
m_processor->add_buffer(std::unique_ptr<db_cmd_t>(new db_cmd_sql_t(sql_cmd)));
214+
}
215+
216+
void db_copy_mgr_t::sync()
217+
{
218+
std::promise<void> barrier;
219+
std::future<void> sync = barrier.get_future();
220+
m_processor->add_buffer(std::unique_ptr<db_cmd_t>(new db_cmd_sync_t(std::move(barrier))));
221+
sync.wait();
210222
}
223+
224+
void db_copy_mgr_t::finish_line()
225+
{
226+
assert(m_current);
227+
228+
auto &buf = m_current->buffer;
229+
assert(!buf.empty());
230+
231+
// Expect that a column has been written last which ended in a '\t'.
232+
// Replace it with the row delimiter '\n'.
233+
auto sz = buf.size();
234+
assert(buf[sz - 1] == '\t');
235+
buf[sz - 1] = '\n';
236+
237+
if (sz > db_cmd_copy_t::Max_buf_size - 100) {
238+
m_processor->add_buffer(std::move(m_current));
239+
}
240+
}
241+

0 commit comments

Comments
 (0)