Skip to content

Commit 974447f

Browse files
committed
Refactor: Move storage of db connections from flex tables to output
Each table of the flex output kept its own database connection in the table_connection_t class. With this change, the connection is kept in a vector in the flex output instead. This is a refactoring step towards using fewer database connections. As long as each table had its own connection, we can not reduce them, but if we keep them outside, we will be able to re-use the same connection for different tables.
1 parent ac6008b commit 974447f

File tree

4 files changed

+67
-70
lines changed

4 files changed

+67
-70
lines changed

src/flex-table.cpp

Lines changed: 30 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -242,14 +242,6 @@ bool flex_table_t::has_columns_with_expire() const noexcept
242242
[](auto const &column) { return column.has_expire(); });
243243
}
244244

245-
void table_connection_t::connect(connection_params_t const &connection_params)
246-
{
247-
assert(!m_db_connection);
248-
249-
m_db_connection =
250-
std::make_unique<pg_conn_t>(connection_params, "out.flex.table");
251-
}
252-
253245
static void enable_check_trigger(pg_conn_t const &db_connection,
254246
flex_table_t const &table)
255247
{
@@ -274,50 +266,46 @@ static void enable_check_trigger(pg_conn_t const &db_connection,
274266
checks);
275267
}
276268

277-
void table_connection_t::start(bool append)
269+
void table_connection_t::start(pg_conn_t const &db_connection, bool append)
278270
{
279-
assert(m_db_connection);
280-
281271
if (!append) {
282-
m_db_connection->exec("DROP TABLE IF EXISTS {} CASCADE",
283-
table().full_name());
272+
db_connection.exec("DROP TABLE IF EXISTS {} CASCADE",
273+
table().full_name());
284274
}
285275

286276
// These _tmp tables can be left behind if we run out of disk space.
287-
m_db_connection->exec("DROP TABLE IF EXISTS {}", table().full_tmp_name());
277+
db_connection.exec("DROP TABLE IF EXISTS {}", table().full_tmp_name());
288278

289279
if (!append) {
290-
m_db_connection->exec(table().build_sql_create_table(
280+
db_connection.exec(table().build_sql_create_table(
291281
table().cluster_by_geom() ? flex_table_t::table_type::interim
292282
: flex_table_t::table_type::permanent,
293283
table().full_name()));
294284

295-
enable_check_trigger(*m_db_connection, table());
285+
enable_check_trigger(db_connection, table());
296286
}
297287

298-
prepare();
288+
prepare(db_connection);
299289
}
300290

301-
void table_connection_t::stop(bool updateable, bool append)
291+
void table_connection_t::stop(pg_conn_t const &db_connection, bool updateable,
292+
bool append)
302293
{
303-
assert(m_db_connection);
304-
305294
m_copy_mgr.sync();
306295

307296
if (append) {
308-
teardown();
309297
return;
310298
}
311299

312300
if (table().cluster_by_geom()) {
313301
if (table().geom_column().needs_isvalid()) {
314-
drop_geom_check_trigger(*m_db_connection, table().schema(),
302+
drop_geom_check_trigger(db_connection, table().schema(),
315303
table().name());
316304
}
317305

318306
log_info("Clustering table '{}' by geometry...", table().name());
319307

320-
m_db_connection->exec(table().build_sql_create_table(
308+
db_connection.exec(table().build_sql_create_table(
321309
flex_table_t::table_type::permanent, table().full_tmp_name()));
322310

323311
std::string const columns = table().build_sql_column_list();
@@ -350,15 +338,15 @@ void table_connection_t::stop(bool updateable, bool append)
350338
sql += geom_column_name;
351339
}
352340

353-
m_db_connection->exec(sql);
341+
db_connection.exec(sql);
354342

355-
m_db_connection->exec("DROP TABLE {}", table().full_name());
356-
m_db_connection->exec(R"(ALTER TABLE {} RENAME TO "{}")",
357-
table().full_tmp_name(), table().name());
343+
db_connection.exec("DROP TABLE {}", table().full_name());
344+
db_connection.exec(R"(ALTER TABLE {} RENAME TO "{}")",
345+
table().full_tmp_name(), table().name());
358346
m_id_index_created = false;
359347

360348
if (updateable) {
361-
enable_check_trigger(*m_db_connection, table());
349+
enable_check_trigger(db_connection, table());
362350
}
363351
}
364352

@@ -370,57 +358,53 @@ void table_connection_t::stop(bool updateable, bool append)
370358
index.columns());
371359
auto const sql = index.create_index(
372360
qualified_name(table().schema(), table().name()));
373-
m_db_connection->exec(sql);
361+
db_connection.exec(sql);
374362
}
375363
}
376364

377365
if ((table().always_build_id_index() || updateable) &&
378366
table().has_id_column()) {
379-
create_id_index();
367+
create_id_index(db_connection);
380368
}
381369

382370
log_info("Analyzing table '{}'...", table().name());
383-
analyze();
384-
385-
teardown();
371+
analyze(db_connection);
386372
}
387373

388-
void table_connection_t::prepare()
374+
void table_connection_t::prepare(pg_conn_t const &db_connection)
389375
{
390-
assert(m_db_connection);
391376
if (table().has_id_column() && table().has_columns_with_expire()) {
392-
m_db_connection->exec(table().build_sql_prepare_get_wkb());
377+
db_connection.exec(table().build_sql_prepare_get_wkb());
393378
}
394379
}
395380

396-
void table_connection_t::analyze()
381+
void table_connection_t::analyze(pg_conn_t const &db_connection)
397382
{
398-
analyze_table(*m_db_connection, table().schema(), table().name());
383+
analyze_table(db_connection, table().schema(), table().name());
399384
}
400385

401-
void table_connection_t::create_id_index()
386+
void table_connection_t::create_id_index(pg_conn_t const &db_connection)
402387
{
403388
if (m_id_index_created) {
404389
log_debug("Id index on table '{}' already created.", table().name());
405390
} else {
406391
log_info("Creating id index on table '{}'...", table().name());
407-
m_db_connection->exec(table().build_sql_create_id_index());
392+
db_connection.exec(table().build_sql_create_id_index());
408393
m_id_index_created = true;
409394
}
410395
}
411396

412-
pg_result_t table_connection_t::get_geoms_by_id(osmium::item_type type,
397+
pg_result_t table_connection_t::get_geoms_by_id(pg_conn_t const &db_connection,
398+
osmium::item_type type,
413399
osmid_t id) const
414400
{
415401
assert(table().has_geom_column());
416-
assert(m_db_connection);
417-
418402
std::string const stmt = fmt::format("get_wkb_{}", table().num());
419403
if (table().has_multicolumn_id_index()) {
420-
return m_db_connection->exec_prepared_as_binary(stmt.c_str(),
421-
type_to_char(type), id);
404+
return db_connection.exec_prepared_as_binary(stmt.c_str(),
405+
type_to_char(type), id);
422406
}
423-
return m_db_connection->exec_prepared_as_binary(stmt.c_str(), id);
407+
return db_connection.exec_prepared_as_binary(stmt.c_str(), id);
424408
}
425409

426410
void table_connection_t::delete_rows_with(osmium::item_type type, osmid_t id)

src/flex-table.hpp

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -260,31 +260,28 @@ class table_connection_t
260260
m_target(std::make_shared<db_target_descr_t>(
261261
table->schema(), table->name(), table->id_column_names(),
262262
table->build_sql_column_list())),
263-
m_copy_mgr(copy_thread), m_db_connection(nullptr)
263+
m_copy_mgr(copy_thread)
264264
{
265265
}
266266

267-
void connect(connection_params_t const &connection_params);
267+
void start(pg_conn_t const &db_connection, bool append);
268268

269-
void start(bool append);
270-
271-
void stop(bool updateable, bool append);
269+
void stop(pg_conn_t const &db_connection, bool updateable, bool append);
272270

273271
flex_table_t const &table() const noexcept { return *m_table; }
274272

275-
void teardown() { m_db_connection.reset(); }
276-
277-
void prepare();
273+
void prepare(pg_conn_t const &db_connection);
278274

279-
void analyze();
275+
void analyze(pg_conn_t const &db_connection);
280276

281-
void create_id_index();
277+
void create_id_index(pg_conn_t const &db_connection);
282278

283279
/**
284280
* Get all geometries that have at least one expire config defined
285281
* from the database and return the result set.
286282
*/
287-
pg_result_t get_geoms_by_id(osmium::item_type type, osmid_t id) const;
283+
pg_result_t get_geoms_by_id(pg_conn_t const &db_connection,
284+
osmium::item_type type, osmid_t id) const;
288285

289286
void flush() { m_copy_mgr.flush(); }
290287

@@ -332,9 +329,6 @@ class table_connection_t
332329
*/
333330
db_copy_mgr_t<db_deleter_by_type_and_id_t> m_copy_mgr;
334331

335-
/// The connection to the database server.
336-
std::unique_ptr<pg_conn_t> m_db_connection;
337-
338332
task_result_t m_task_result;
339333

340334
std::size_t m_count_insert = 0;

src/output-flex.cpp

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,9 +1075,12 @@ void output_flex_t::after_ways()
10751075
void output_flex_t::stop()
10761076
{
10771077
for (auto &table : m_table_connections) {
1078-
table.task_set(thread_pool().submit([&]() {
1079-
table.stop(get_options()->slim && !get_options()->droptemp,
1078+
auto *db_connection = &m_db_connections.at(table.table().num());
1079+
table.task_set(thread_pool().submit([&, db_connection]() {
1080+
table.stop(*db_connection,
1081+
get_options()->slim && !get_options()->droptemp,
10801082
get_options()->append);
1083+
db_connection->close();
10811084
}));
10821085
}
10831086

@@ -1151,13 +1154,15 @@ void output_flex_t::relation_add(osmium::Relation const &relation)
11511154
}
11521155

11531156
void output_flex_t::delete_from_table(table_connection_t *table_connection,
1157+
pg_conn_t const &db_connection,
11541158
osmium::item_type type, osmid_t osm_id)
11551159
{
11561160
assert(table_connection);
11571161
auto const id = table_connection->table().map_id(type, osm_id);
11581162

11591163
if (table_connection->table().has_columns_with_expire()) {
1160-
auto const result = table_connection->get_geoms_by_id(type, id);
1164+
auto const result =
1165+
table_connection->get_geoms_by_id(db_connection, type, id);
11611166
auto const num_tuples = result.num_tuples();
11621167
if (num_tuples > 0) {
11631168
int col = 0;
@@ -1180,7 +1185,8 @@ void output_flex_t::delete_from_tables(osmium::item_type type, osmid_t osm_id)
11801185
{
11811186
for (auto &table : m_table_connections) {
11821187
if (table.table().matches_type(type) && table.table().has_id_column()) {
1183-
delete_from_table(&table, type, osm_id);
1188+
delete_from_table(&table, m_db_connections.at(table.table().num()),
1189+
type, osm_id);
11841190
}
11851191
}
11861192
}
@@ -1224,9 +1230,12 @@ void output_flex_t::relation_modify(osmium::Relation const &rel)
12241230

12251231
void output_flex_t::start()
12261232
{
1233+
assert(m_db_connections.empty());
1234+
12271235
for (auto &table : m_table_connections) {
1228-
table.connect(get_options()->connection_params);
1229-
table.start(get_options()->append);
1236+
m_db_connections.emplace_back(get_options()->connection_params,
1237+
"out.flex.table");
1238+
table.start(m_db_connections.back(), get_options()->append);
12301239
}
12311240
}
12321241

@@ -1260,10 +1269,13 @@ output_flex_t::output_flex_t(output_flex_t const *other,
12601269
m_process_relation(other->m_process_relation),
12611270
m_select_relation_members(other->m_select_relation_members)
12621271
{
1272+
assert(m_db_connections.empty());
1273+
12631274
for (auto &table : *m_tables) {
12641275
auto &tc = m_table_connections.emplace_back(&table, m_copy_thread);
1265-
tc.connect(get_options()->connection_params);
1266-
tc.prepare();
1276+
m_db_connections.emplace_back(get_options()->connection_params,
1277+
"out.flex.table");
1278+
tc.prepare(m_db_connections.back());
12671279
}
12681280

12691281
for (auto &expire_output : *m_expire_outputs) {
@@ -1509,8 +1521,10 @@ void output_flex_t::reprocess_marked()
15091521
for (auto &table : m_table_connections) {
15101522
if (table.table().matches_type(osmium::item_type::way) &&
15111523
table.table().has_id_column()) {
1512-
table.analyze();
1513-
table.create_id_index();
1524+
auto const &db_connection =
1525+
m_db_connections.at(table.table().num());
1526+
table.analyze(db_connection);
1527+
table.create_id_index(db_connection);
15141528
}
15151529
}
15161530

src/output-flex.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,9 @@ class output_flex_t : public output_t
224224
void add_row(table_connection_t *table_connection, OBJECT const &object);
225225

226226
void delete_from_table(table_connection_t *table_connection,
227+
pg_conn_t const &db_connection,
227228
osmium::item_type type, osmid_t osm_id);
229+
228230
void delete_from_tables(osmium::item_type type, osmid_t osm_id);
229231

230232
lua_State *lua_state() noexcept { return m_lua_state.get(); }
@@ -288,6 +290,9 @@ class output_flex_t : public output_t
288290

289291
std::vector<table_connection_t> m_table_connections;
290292

293+
/// The connections to the database server for each table.
294+
std::vector<pg_conn_t> m_db_connections;
295+
291296
// This is shared between all clones of the output and must only be
292297
// accessed while protected using the lua_mutex.
293298
std::shared_ptr<idset_t> m_stage2_way_ids = std::make_shared<idset_t>();

0 commit comments

Comments
 (0)