Skip to content

Commit fc33126

Browse files
committed
introduce copy object responsible for database loading
1 parent 0275c1a commit fc33126

File tree

3 files changed

+353
-0
lines changed

3 files changed

+353
-0
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ if (NOT HAVE_UNISTD_H AND NOT EXISTS ${CMAKE_CURRENT_BINARY_DIR}/unistd.h)
168168
endif()
169169

170170
set(osm2pgsql_lib_SOURCES
171+
db-copy.cpp
171172
expire-tiles.cpp
172173
geometry-processor.cpp
173174
id-tracker.cpp
@@ -197,6 +198,7 @@ set(osm2pgsql_lib_SOURCES
197198
tagtransform-c.cpp
198199
util.cpp
199200
wildcmp.cpp
201+
db-copy.hpp
200202
expire-tiles.hpp
201203
geometry-processor.hpp
202204
id-tracker.hpp

db-copy.cpp

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
#include <boost/format.hpp>
2+
#include <cassert>
3+
#include <cstdio>
4+
#include <thread>
5+
6+
#include "db-copy.hpp"
7+
#include "pgsql.hpp"
8+
9+
using fmt = boost::format;
10+
11+
db_copy_thread_t::db_copy_thread_t(std::string const &conninfo)
12+
: m_conninfo(conninfo), m_conn(nullptr)
13+
{
14+
m_worker = std::thread([this]() {
15+
try {
16+
worker_thread();
17+
} catch (std::runtime_error const &e) {
18+
fprintf(stderr, "DB writer thread failed due to ERROR: %s\n",
19+
e.what());
20+
exit(2);
21+
}
22+
});
23+
}
24+
25+
void db_copy_thread_t::add_buffer(std::unique_ptr<db_copy_buffer_t> &&buffer)
26+
{
27+
std::unique_lock<std::mutex> lock(m_queue_mutex);
28+
m_worker_queue.push_back(std::move(buffer));
29+
}
30+
31+
void db_copy_thread_t::finish()
32+
{
33+
add_buffer(std::unique_ptr<db_copy_buffer_t>());
34+
m_worker.join();
35+
}
36+
37+
void db_copy_thread_t::worker_thread()
38+
{
39+
connect();
40+
41+
for (;;) {
42+
std::unique_ptr<db_copy_buffer_t> item;
43+
{
44+
std::unique_lock<std::mutex> lock(m_queue_mutex);
45+
if (m_worker_queue.empty()) {
46+
m_queue_cond.wait(lock);
47+
continue;
48+
}
49+
50+
item = std::move(m_worker_queue.front());
51+
m_worker_queue.pop_front();
52+
}
53+
54+
if (!item)
55+
break;
56+
57+
if (item->is_copy_buffer())
58+
write_to_db(std::move(item));
59+
else
60+
execute_sql(item->buffer);
61+
}
62+
63+
if (m_inflight)
64+
finish_copy();
65+
66+
commit();
67+
}
68+
69+
void db_copy_thread_t::connect()
70+
{
71+
assert(!m_conn);
72+
73+
PGconn *conn = PQconnectdb(m_conninfo.c_str());
74+
if (PQstatus(conn) != CONNECTION_OK)
75+
throw std::runtime_error(
76+
(fmt("Connection to database failed: %1%\n") % PQerrorMessage(conn))
77+
.str());
78+
m_conn = conn;
79+
80+
// Let commits happen faster by delaying when they actually occur.
81+
pgsql_exec_simple(m_conn, PGRES_COMMAND_OK,
82+
"SET synchronous_commit TO off;");
83+
// Wrap everything into one huge transaction. XXX is that a good idea?
84+
pgsql_exec_simple(m_conn, PGRES_COMMAND_OK, "BEGIN");
85+
}
86+
87+
void db_copy_thread_t::execute_sql(std::string const &sql_cmd)
88+
{
89+
if (m_inflight)
90+
finish_copy();
91+
92+
pgsql_exec_simple(m_conn, PGRES_COMMAND_OK, sql_cmd.c_str());
93+
}
94+
95+
void db_copy_thread_t::commit()
96+
{
97+
if (!m_conn)
98+
return;
99+
100+
fprintf(stderr, "Committing transactions\n");
101+
pgsql_exec_simple(m_conn, PGRES_COMMAND_OK, "COMMIT");
102+
103+
PQfinish(m_conn);
104+
m_conn = nullptr;
105+
}
106+
107+
void db_copy_thread_t::write_to_db(std::unique_ptr<db_copy_buffer_t> &&buffer)
108+
{
109+
if (!buffer->deletables.empty() ||
110+
(m_inflight && !buffer->target->same_copy_target(*m_inflight->target)))
111+
finish_copy();
112+
113+
if (!buffer->deletables.empty())
114+
delete_rows(buffer.get());
115+
116+
start_copy(std::move(buffer));
117+
118+
pgsql_CopyData(m_inflight->target->name.c_str(), m_conn,
119+
m_inflight->buffer);
120+
}
121+
122+
void db_copy_thread_t::delete_rows(db_copy_buffer_t *buffer)
123+
{
124+
assert(!m_inflight);
125+
126+
std::string sql = "DELETE FROM ";
127+
sql.reserve(buffer->target->name.size() + buffer->deletables.size() * 15 +
128+
30);
129+
sql += buffer->target->name;
130+
sql += "WHERE ";
131+
sql += buffer->target->id;
132+
sql += " IN (";
133+
for (auto id : buffer->deletables) {
134+
sql += std::to_string(id);
135+
sql += ',';
136+
}
137+
sql += ')';
138+
139+
pgsql_exec_simple(m_conn, PGRES_COMMAND_OK, sql);
140+
}
141+
142+
void db_copy_thread_t::start_copy(std::unique_ptr<db_copy_buffer_t> &&buffer)
143+
{
144+
if (!m_inflight) {
145+
std::string copystr = "COPY ";
146+
copystr.reserve(buffer->target->name.size() +
147+
buffer->target->rows.size() + 14);
148+
copystr += buffer->target->name;
149+
if (!buffer->target->rows.empty()) {
150+
copystr += '(';
151+
copystr += buffer->target->rows;
152+
copystr += ')';
153+
}
154+
copystr += " FROM STDIN";
155+
pgsql_exec_simple(m_conn, PGRES_COPY_IN, copystr);
156+
}
157+
158+
m_inflight = std::move(buffer);
159+
}
160+
161+
void db_copy_thread_t::finish_copy()
162+
{
163+
if (PQputCopyEnd(m_conn, nullptr) != 1)
164+
throw std::runtime_error((fmt("stop COPY_END for %1% failed: %2%\n") %
165+
m_inflight->target->name %
166+
PQerrorMessage(m_conn))
167+
.str());
168+
169+
pg_result_t res(PQgetResult(m_conn));
170+
if (PQresultStatus(res.get()) != PGRES_COMMAND_OK)
171+
throw std::runtime_error((fmt("result COPY_END for %1% failed: %2%\n") %
172+
m_inflight->target->name %
173+
PQerrorMessage(m_conn))
174+
.str());
175+
176+
m_inflight.reset();
177+
}
178+
179+
db_copy_mgr_t::db_copy_mgr_t(std::shared_ptr<db_copy_thread_t> const &processor)
180+
: m_processor(processor), m_last_line(0)
181+
{}
182+
183+
void db_copy_mgr_t::new_line(std::shared_ptr<db_target_descr_t> const &table)
184+
{
185+
if (!m_current || !m_current->target->same_copy_target(*table.get())) {
186+
if (m_current) {
187+
m_processor->add_buffer(std::move(m_current));
188+
}
189+
190+
m_current.reset(new db_copy_buffer_t(table));
191+
}
192+
}
193+
194+
void db_copy_mgr_t::delete_id(osmid_t osm_id)
195+
{
196+
assert(m_current);
197+
m_current->deletables.push_back(osm_id);
198+
}
199+
200+
void db_copy_mgr_t::exec_sql(std::string const &sql_cmd)
201+
{
202+
// finish any ongoing copy operations
203+
if (m_current) {
204+
m_processor->add_buffer(std::move(m_current));
205+
}
206+
207+
// and add SQL command
208+
m_current.reset(new db_copy_buffer_t(sql_cmd));
209+
m_processor->add_buffer(std::move(m_current));
210+
}

db-copy.hpp

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
#ifndef DB_COPY_HPP
2+
#define DB_COPY_HPP
3+
4+
#include <condition_variable>
5+
#include <deque>
6+
#include <memory>
7+
#include <mutex>
8+
#include <string>
9+
#include <thread>
10+
#include <vector>
11+
12+
#include "osmtypes.hpp"
13+
14+
struct pg_conn;
15+
16+
/**
17+
* Table information necessary for building SQL queries.
18+
*/
19+
struct db_target_descr_t
20+
{
21+
/// Name of the target table for the copy operation.
22+
/// If empty, then `buffer` contains a single SQL command to execute.
23+
std::string name;
24+
/// Comma-separated list of rows for copy operation (when empty: all rows)
25+
std::string rows;
26+
/// Name of id column used when deleting objects.
27+
std::string id;
28+
29+
/**
30+
* Check if the buffer would use exactly the same copy operation.
31+
*/
32+
bool same_copy_target(db_target_descr_t const &other) const noexcept
33+
{
34+
return (this == &other) || (name == other.name && rows == other.rows);
35+
}
36+
};
37+
38+
/**
39+
* Buffer containing stuff to copy into the database.
40+
*/
41+
struct db_copy_buffer_t
42+
{
43+
/// Name of the target table for the copy operation
44+
std::shared_ptr<db_target_descr_t> target;
45+
/// Vector with object to delete before copying
46+
std::vector<osmid_t> deletables;
47+
/// actual copy buffer
48+
std::string buffer;
49+
50+
explicit db_copy_buffer_t(std::shared_ptr<db_target_descr_t> const &t)
51+
: target(t)
52+
{}
53+
54+
explicit db_copy_buffer_t(std::string const &sql_command)
55+
: buffer(sql_command)
56+
{
57+
}
58+
59+
bool is_copy_buffer() const { return !!target; }
60+
};
61+
62+
/**
63+
* The worker thread that streams copy data into the database.
64+
*/
65+
class db_copy_thread_t
66+
{
67+
public:
68+
db_copy_thread_t(std::string const &conninfo);
69+
70+
/**
71+
* Add another buffer for copying into the database.
72+
*/
73+
void add_buffer(std::unique_ptr<db_copy_buffer_t> &&buffer);
74+
75+
/**
76+
* Finish the copy process.
77+
*
78+
* Only returns when all remaining data has been committed to the
79+
* database.
80+
*/
81+
void finish();
82+
83+
private:
84+
void worker_thread();
85+
86+
void connect();
87+
void commit();
88+
89+
void execute_sql(std::string const &sql_cmd);
90+
void write_to_db(std::unique_ptr<db_copy_buffer_t> &&buffer);
91+
void start_copy(std::unique_ptr<db_copy_buffer_t> &&buffer);
92+
void finish_copy();
93+
void delete_rows(db_copy_buffer_t *buffer);
94+
95+
std::string m_conninfo;
96+
pg_conn *m_conn;
97+
98+
std::thread m_worker;
99+
std::mutex m_queue_mutex;
100+
std::condition_variable m_queue_cond;
101+
std::deque<std::unique_ptr<db_copy_buffer_t>> m_worker_queue;
102+
103+
std::unique_ptr<db_copy_buffer_t> m_inflight;
104+
};
105+
106+
/**
107+
* Management class that fills and manages copy buffers.
108+
*/
109+
class db_copy_mgr_t
110+
{
111+
public:
112+
db_copy_mgr_t(std::shared_ptr<db_copy_thread_t> const &processor);
113+
114+
/**
115+
* Start a new table row.
116+
*/
117+
void new_line(std::shared_ptr<db_target_descr_t> const &table);
118+
119+
/**
120+
* Mark an OSM object for deletion in the current table.
121+
*
122+
* The object is guaranteed to be deleted before any lines
123+
* following the delete_id() are inserted.
124+
*/
125+
void delete_id(osmid_t osm_id);
126+
127+
/**
128+
* Run an SQL statement.
129+
*
130+
* The statement is run in order. That means any previously submitted
131+
* copyblocks are finished first.
132+
*/
133+
void exec_sql(std::string const &sql_cmd);
134+
135+
private:
136+
std::shared_ptr<db_copy_thread_t> m_processor;
137+
std::unique_ptr<db_copy_buffer_t> m_current;
138+
unsigned m_last_line;
139+
};
140+
141+
#endif

0 commit comments

Comments
 (0)