Skip to content

Commit 7d10dca

Browse files
committed
Simplify code around multi-threaded index creation
1 parent 8844a3b commit 7d10dca

File tree

2 files changed

+33
-23
lines changed

2 files changed

+33
-23
lines changed

src/middle-pgsql.cpp

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -85,23 +85,34 @@ void middle_query_pgsql_t::exec_sql(std::string const &sql_cmd) const
8585
m_sql_conn.exec(sql_cmd);
8686
}
8787

88-
void middle_pgsql_t::table_desc::stop(std::string const &conninfo,
89-
bool droptemp, bool build_indexes)
88+
void middle_pgsql_t::table_desc::drop_table(
89+
pg_conn_t const &db_connection) const
9090
{
9191
util::timer_t timer;
9292

93+
log_info("Dropping table '{}'", name());
94+
95+
auto const qual_name = qualified_name(schema(), name());
96+
db_connection.exec("DROP TABLE IF EXISTS {}"_format(qual_name));
97+
98+
log_info("Done postprocessing on table '{}' in {}", name(),
99+
util::human_readable_duration(timer.stop()));
100+
}
101+
102+
void middle_pgsql_t::table_desc::build_index(std::string const &conninfo) const
103+
{
104+
if (m_create_fw_dep_indexes.empty()) {
105+
return;
106+
}
107+
108+
util::timer_t timer;
109+
93110
// Use a temporary connection here because we might run in a separate
94111
// thread context.
95-
pg_conn_t sql_conn{conninfo};
112+
pg_conn_t db_connection{conninfo};
96113

97-
if (droptemp) {
98-
log_info("Dropping table '{}'", name());
99-
auto const qual_name = qualified_name(schema(), name());
100-
sql_conn.exec("DROP TABLE IF EXISTS {}"_format(qual_name));
101-
} else if (build_indexes && !m_create_fw_dep_indexes.empty()) {
102-
log_info("Building index on table '{}'", name());
103-
sql_conn.exec(m_create_fw_dep_indexes);
104-
}
114+
log_info("Building index on table '{}'", name());
115+
db_connection.exec(m_create_fw_dep_indexes);
105116

106117
log_info("Done postprocessing on table '{}' in {}", name(),
107118
util::human_readable_duration(timer.stop()));
@@ -571,7 +582,6 @@ void middle_pgsql_t::start()
571582
}
572583
}
573584
} else {
574-
// (Re)create tables.
575585
m_db_connection.exec("SET client_min_messages = WARNING");
576586
for (auto const &table : m_tables) {
577587
log_debug("Setting up table '{}'", table.name());
@@ -580,9 +590,6 @@ void middle_pgsql_t::start()
580590
"DROP TABLE IF EXISTS {} CASCADE"_format(qual_name));
581591
m_db_connection.exec(table.m_create_table);
582592
}
583-
584-
// The extra query connection is only needed in append mode, so close.
585-
m_db_connection.close();
586593
}
587594
}
588595

@@ -606,14 +613,14 @@ void middle_pgsql_t::stop(thread_pool_t &pool)
606613
// Dropping the tables is fast, so do it synchronously to guarantee
607614
// that the space is freed before creating the other indices.
608615
for (auto &table : m_tables) {
609-
table.stop(m_options->database_options.conninfo(),
610-
m_options->droptemp, !m_options->append);
616+
table.drop_table(m_db_connection);
611617
}
612-
} else {
618+
} else if (!m_options->append) {
619+
// Building the indexes takes time, so do it asynchronously.
613620
for (auto &table : m_tables) {
614-
pool.submit(std::bind(&middle_pgsql_t::table_desc::stop, &table,
615-
m_options->database_options.conninfo(),
616-
m_options->droptemp, !m_options->append));
621+
pool.submit(std::bind(&middle_pgsql_t::table_desc::build_index,
622+
&table,
623+
m_options->database_options.conninfo()));
617624
}
618625
}
619626
}

src/middle-pgsql.hpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,11 @@ struct middle_pgsql_t : public slim_middle_t
9494
return m_copy_target;
9595
}
9696

97-
void stop(std::string const &conninfo, bool droptemp,
98-
bool build_indexes);
97+
///< Drop table from database using existing database connection.
98+
void drop_table(pg_conn_t const &db_connection) const;
99+
100+
///< Open a new database connection and build index on this table.
101+
void build_index(std::string const &conninfo) const;
99102

100103
std::string m_create_table;
101104
std::string m_prepare_query;

0 commit comments

Comments
 (0)