Skip to content

Commit c7ab81c

Browse files
authored
Merge pull request #1359 from joto/middle-pgsql-cleanup
Various cleanups in middle-pgsql
2 parents 1c43ca6 + 7d10dca commit c7ab81c

File tree

3 files changed

+77
-48
lines changed

3 files changed

+77
-48
lines changed

src/middle-pgsql.cpp

Lines changed: 48 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -85,24 +85,34 @@ void middle_query_pgsql_t::exec_sql(std::string const &sql_cmd) const
8585
m_sql_conn.exec(sql_cmd);
8686
}
8787

88-
void middle_pgsql_t::table_desc::stop(std::string const &conninfo,
89-
bool droptemp, bool build_indexes)
88+
void middle_pgsql_t::table_desc::drop_table(
89+
pg_conn_t const &db_connection) const
9090
{
9191
util::timer_t timer;
9292

93+
log_info("Dropping table '{}'", name());
94+
95+
auto const qual_name = qualified_name(schema(), name());
96+
db_connection.exec("DROP TABLE IF EXISTS {}"_format(qual_name));
97+
98+
log_info("Done postprocessing on table '{}' in {}", name(),
99+
util::human_readable_duration(timer.stop()));
100+
}
101+
102+
void middle_pgsql_t::table_desc::build_index(std::string const &conninfo) const
103+
{
104+
if (m_create_fw_dep_indexes.empty()) {
105+
return;
106+
}
107+
108+
util::timer_t timer;
109+
93110
// Use a temporary connection here because we might run in a separate
94111
// thread context.
95-
pg_conn_t sql_conn{conninfo};
112+
pg_conn_t db_connection{conninfo};
96113

97-
if (droptemp) {
98-
log_info("Dropping table '{}'", name());
99-
auto const qual_name = qualified_name(
100-
m_copy_target->schema, m_copy_target->name);
101-
sql_conn.exec("DROP TABLE IF EXISTS {}"_format(qual_name));
102-
} else if (build_indexes && !m_create_fw_dep_indexes.empty()) {
103-
log_info("Building index on table '{}'", name());
104-
sql_conn.exec(m_create_fw_dep_indexes);
105-
}
114+
log_info("Building index on table '{}'", name());
115+
db_connection.exec(m_create_fw_dep_indexes);
106116

107117
log_info("Done postprocessing on table '{}' in {}", name(),
108118
util::human_readable_duration(timer.stop()));
@@ -309,10 +319,10 @@ void middle_pgsql_t::node_set(osmium::Node const &node)
309319
{
310320
m_cache->set(node.id(), node.location());
311321

312-
if (m_out_options->flat_node_cache_enabled) {
322+
if (m_options->flat_node_cache_enabled) {
313323
m_persistent_cache->set(node.id(), node.location());
314324
} else {
315-
m_db_copy.new_line(m_tables[NODE_TABLE].m_copy_target);
325+
m_db_copy.new_line(m_tables[NODE_TABLE].copy_target());
316326

317327
m_db_copy.add_columns(node.id(), node.location().y(),
318328
node.location().x());
@@ -329,12 +339,12 @@ size_t middle_query_pgsql_t::nodes_get_list(osmium::WayNodeList *nodes) const
329339

330340
void middle_pgsql_t::node_delete(osmid_t osm_id)
331341
{
332-
assert(m_append);
342+
assert(m_options->append);
333343

334-
if (m_out_options->flat_node_cache_enabled) {
344+
if (m_options->flat_node_cache_enabled) {
335345
m_persistent_cache->set(osm_id, osmium::Location{});
336346
} else {
337-
m_db_copy.new_line(m_tables[NODE_TABLE].m_copy_target);
347+
m_db_copy.new_line(m_tables[NODE_TABLE].copy_target());
338348
m_db_copy.delete_object(osm_id);
339349
}
340350
}
@@ -367,7 +377,7 @@ idlist_t middle_pgsql_t::get_rels_by_way(osmid_t osm_id)
367377

368378
void middle_pgsql_t::way_set(osmium::Way const &way)
369379
{
370-
m_db_copy.new_line(m_tables[WAY_TABLE].m_copy_target);
380+
m_db_copy.new_line(m_tables[WAY_TABLE].copy_target());
371381

372382
m_db_copy.add_column(way.id());
373383

@@ -378,7 +388,7 @@ void middle_pgsql_t::way_set(osmium::Way const &way)
378388
}
379389
m_db_copy.finish_array();
380390

381-
buffer_store_tags(way, m_out_options->extra_attributes);
391+
buffer_store_tags(way, m_options->extra_attributes);
382392

383393
m_db_copy.finish_line();
384394
}
@@ -460,8 +470,8 @@ middle_query_pgsql_t::rel_way_members_get(osmium::Relation const &rel,
460470

461471
void middle_pgsql_t::way_delete(osmid_t osm_id)
462472
{
463-
assert(m_append);
464-
m_db_copy.new_line(m_tables[WAY_TABLE].m_copy_target);
473+
assert(m_options->append);
474+
m_db_copy.new_line(m_tables[WAY_TABLE].copy_target());
465475
m_db_copy.delete_object(osm_id);
466476
}
467477

@@ -474,7 +484,7 @@ void middle_pgsql_t::relation_set(osmium::Relation const &rel)
474484
parts[osmium::item_type_to_nwr_index(m.type())].push_back(m.ref());
475485
}
476486

477-
m_db_copy.new_line(m_tables[REL_TABLE].m_copy_target);
487+
m_db_copy.new_line(m_tables[REL_TABLE].copy_target());
478488

479489
// id, way offset, relation offset
480490
m_db_copy.add_columns(rel.id(), parts[0].size(),
@@ -503,7 +513,7 @@ void middle_pgsql_t::relation_set(osmium::Relation const &rel)
503513
}
504514

505515
// tags
506-
buffer_store_tags(rel, m_out_options->extra_attributes);
516+
buffer_store_tags(rel, m_options->extra_attributes);
507517

508518
m_db_copy.finish_line();
509519
}
@@ -533,9 +543,9 @@ bool middle_query_pgsql_t::relation_get(osmid_t id,
533543

534544
void middle_pgsql_t::relation_delete(osmid_t osm_id)
535545
{
536-
assert(m_append);
546+
assert(m_options->append);
537547

538-
m_db_copy.new_line(m_tables[REL_TABLE].m_copy_target);
548+
m_db_copy.new_line(m_tables[REL_TABLE].copy_target());
539549
m_db_copy.delete_object(osm_id);
540550
}
541551

@@ -559,7 +569,7 @@ middle_query_pgsql_t::middle_query_pgsql_t(
559569

560570
void middle_pgsql_t::start()
561571
{
562-
if (m_append) {
572+
if (m_options->append) {
563573
// Disable JIT and parallel workers as they are known to cause
564574
// problems when accessing the intarrays.
565575
m_db_connection.set_config("jit_above_cost", "-1");
@@ -572,19 +582,14 @@ void middle_pgsql_t::start()
572582
}
573583
}
574584
} else {
575-
// (Re)create tables.
576585
m_db_connection.exec("SET client_min_messages = WARNING");
577586
for (auto const &table : m_tables) {
578587
log_debug("Setting up table '{}'", table.name());
579-
auto const qual_name = qualified_name(
580-
table.m_copy_target->schema, table.m_copy_target->name);
588+
auto const qual_name = qualified_name(table.schema(), table.name());
581589
m_db_connection.exec(
582590
"DROP TABLE IF EXISTS {} CASCADE"_format(qual_name));
583591
m_db_connection.exec(table.m_create_table);
584592
}
585-
586-
// The extra query connection is only needed in append mode, so close.
587-
m_db_connection.close();
588593
}
589594
}
590595

@@ -600,22 +605,22 @@ void middle_pgsql_t::flush() { m_db_copy.sync(); }
600605
void middle_pgsql_t::stop(thread_pool_t &pool)
601606
{
602607
m_cache.reset();
603-
if (m_out_options->flat_node_cache_enabled) {
608+
if (m_options->flat_node_cache_enabled) {
604609
m_persistent_cache.reset();
605610
}
606611

607-
if (m_out_options->droptemp) {
612+
if (m_options->droptemp) {
608613
// Dropping the tables is fast, so do it synchronously to guarantee
609614
// that the space is freed before creating the other indices.
610615
for (auto &table : m_tables) {
611-
table.stop(m_out_options->database_options.conninfo(),
612-
m_out_options->droptemp, !m_append);
616+
table.drop_table(m_db_connection);
613617
}
614-
} else {
618+
} else if (!m_options->append) {
619+
// Building the indexes takes time, so do it asynchronously.
615620
for (auto &table : m_tables) {
616-
pool.submit(std::bind(&middle_pgsql_t::table_desc::stop, &table,
617-
m_out_options->database_options.conninfo(),
618-
m_out_options->droptemp, !m_append));
621+
pool.submit(std::bind(&middle_pgsql_t::table_desc::build_index,
622+
&table,
623+
m_options->database_options.conninfo()));
619624
}
620625
}
621626
}
@@ -743,10 +748,10 @@ static bool check_bucket_index(pg_conn_t *db_connection,
743748
}
744749

745750
middle_pgsql_t::middle_pgsql_t(options_t const *options)
746-
: m_append(options->append), m_out_options(options),
751+
: m_options(options),
747752
m_cache(new node_ram_cache{options->alloc_chunkwise | ALLOC_LOSSY,
748753
options->cache}),
749-
m_db_connection(m_out_options->database_options.conninfo()),
754+
m_db_connection(m_options->database_options.conninfo()),
750755
m_copy_thread(
751756
std::make_shared<db_copy_thread_t>(options->database_options.conninfo())),
752757
m_db_copy(m_copy_thread)
@@ -779,7 +784,7 @@ middle_pgsql_t::get_query_instance()
779784
// NOTE: this is thread safe for use in pending async processing only because
780785
// during that process they are only read from
781786
std::unique_ptr<middle_query_pgsql_t> mid{
782-
new middle_query_pgsql_t{m_out_options->database_options.conninfo(),
787+
new middle_query_pgsql_t{m_options->database_options.conninfo(),
783788
m_cache, m_persistent_cache}};
784789

785790
// We use a connection per table to enable the use of COPY

src/middle-pgsql.hpp

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,16 +82,30 @@ struct middle_pgsql_t : public slim_middle_t
8282
table_desc() = default;
8383
table_desc(options_t const &options, table_sql const &ts);
8484

85-
char const *name() const { return m_copy_target->name.c_str(); }
85+
std::string const &schema() const noexcept
86+
{
87+
return m_copy_target->schema;
88+
}
8689

87-
void stop(std::string const &conninfo, bool droptemp,
88-
bool build_indexes);
90+
std::string const &name() const noexcept { return m_copy_target->name; }
91+
92+
std::shared_ptr<db_target_descr_t> const &copy_target() const noexcept
93+
{
94+
return m_copy_target;
95+
}
96+
97+
///< Drop table from database using existing database connection.
98+
void drop_table(pg_conn_t const &db_connection) const;
99+
100+
///< Open a new database connection and build index on this table.
101+
void build_index(std::string const &conninfo) const;
89102

90103
std::string m_create_table;
91104
std::string m_prepare_query;
92105
std::string m_prepare_fw_dep_lookups;
93106
std::string m_create_fw_dep_indexes;
94107

108+
private:
95109
std::shared_ptr<db_target_descr_t> m_copy_target;
96110
};
97111

@@ -112,8 +126,7 @@ struct middle_pgsql_t : public slim_middle_t
112126

113127
table_desc m_tables[NUM_TABLES];
114128

115-
bool m_append;
116-
options_t const *m_out_options;
129+
options_t const *m_options;
117130

118131
std::shared_ptr<node_ram_cache> m_cache;
119132
std::shared_ptr<node_persistent_cache> m_persistent_cache;

src/pgsql.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "util.hpp"
66

77
#include <array>
8+
#include <cassert>
89
#include <cstdarg>
910
#include <cstdio>
1011

@@ -22,11 +23,15 @@ pg_conn_t::pg_conn_t(std::string const &conninfo)
2223

2324
char const *pg_conn_t::error_msg() const noexcept
2425
{
26+
assert(m_conn);
27+
2528
return PQerrorMessage(m_conn.get());
2629
}
2730

2831
pg_result_t pg_conn_t::query(ExecStatusType expect, char const *sql) const
2932
{
33+
assert(m_conn);
34+
3035
log_sql("{}", sql);
3136
pg_result_t res{PQexec(m_conn.get(), sql)};
3237
if (PQresultStatus(res.get()) != expect) {
@@ -69,6 +74,8 @@ void pg_conn_t::exec(std::string const &sql) const
6974
void pg_conn_t::copy_data(std::string const &sql,
7075
std::string const &context) const
7176
{
77+
assert(m_conn);
78+
7279
log_sql_data("Copy data to '{}':\n{}", context, sql);
7380
int const r = PQputCopyData(m_conn.get(), sql.c_str(), (int)sql.size());
7481

@@ -97,6 +104,8 @@ void pg_conn_t::copy_data(std::string const &sql,
97104

98105
void pg_conn_t::end_copy(std::string const &context) const
99106
{
107+
assert(m_conn);
108+
100109
if (PQputCopyEnd(m_conn.get(), nullptr) != 1) {
101110
throw std::runtime_error{"Ending COPY mode for '{}' failed: {}."_format(
102111
context, error_msg())};
@@ -113,6 +122,8 @@ pg_result_t
113122
pg_conn_t::exec_prepared_internal(char const *stmt, int num_params,
114123
char const *const *param_values) const
115124
{
125+
assert(m_conn);
126+
116127
if (get_logger().log_sql()) {
117128
std::string params;
118129
for (int i = 0; i < num_params; ++i) {

0 commit comments

Comments
 (0)