Skip to content

Commit f83107d

Browse files
committed
New thread pool implementation
Change the thread pool implementation giving us several new features: * Threads are now numbered and when debug logging is used, the thread number is reported in each log line. * Each task running in the thread pool is timed. This allows for better logging where our time is spent. * When submitting a task we now return a future that can be used to access the running time of the thread. The code in the main thread can also wait for this future to make sure the background task is done. This switches the minimum C++ version needed from 11 to 14 because we need lambda capture by move.
1 parent fe75138 commit f83107d

26 files changed

+311
-55
lines changed

.github/workflows/ci.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ jobs:
114114
LUAJIT_OPTION: OFF
115115
POSTGRESQL_VERSION: 10
116116
POSTGIS_VERSION: 3
117-
CPP_VERSION: 11
117+
CPP_VERSION: 14
118118
BUILD_TYPE: Debug
119119

120120
steps:
@@ -133,7 +133,7 @@ jobs:
133133
LUAJIT_OPTION: OFF
134134
POSTGRESQL_VERSION: 11
135135
POSTGIS_VERSION: 2.5
136-
CPP_VERSION: 11
136+
CPP_VERSION: 14
137137
BUILD_TYPE: Debug
138138

139139
steps:

CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ if (NOT CMAKE_BUILD_TYPE)
4141
endif()
4242

4343
if (NOT "${CMAKE_CXX_STANDARD}")
44-
set(CMAKE_CXX_STANDARD 11)
44+
set(CMAKE_CXX_STANDARD 14)
4545
endif()
4646
message(STATUS "Building in C++${CMAKE_CXX_STANDARD} mode")
4747
set(CMAKE_CXX_EXTENSIONS OFF)
@@ -53,7 +53,7 @@ if (MSVC)
5353
else()
5454
add_definitions(-Wall)
5555
if (CMAKE_VERSION VERSION_LESS 3.1)
56-
add_definitions(-std=c++11)
56+
add_definitions(-std=c++14)
5757
endif()
5858
endif()
5959

README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ It also requires access to a database server running
6161
[PostGIS](http://www.postgis.net/) 2.2+.
6262

6363
Make sure you have installed the development packages for the libraries
64-
mentioned in the requirements section and a C++ compiler which supports C++11.
65-
GCC 5 and later and Clang 3.5 and later are known to work.
64+
mentioned in the requirements section and a C++ compiler which supports C++14.
6665

6766
To rebuild the included man page you'll need the [pandoc](https://pandoc.org/)
6867
tool.

src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ set(osm2pgsql_lib_SOURCES
3131
taginfo.cpp
3232
tagtransform-c.cpp
3333
tagtransform.cpp
34+
thread-pool.cpp
3435
util.cpp
3536
wildcmp.cpp
3637
)

src/flex-table.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -199,8 +199,6 @@ void table_connection_t::stop(bool updateable, bool append)
199199
return;
200200
}
201201

202-
util::timer_t timer;
203-
204202
if (table().has_geom_column()) {
205203
if (table().geom_column().needs_isvalid()) {
206204
drop_geom_check_trigger(m_db_connection.get(), table().schema(),
@@ -272,9 +270,6 @@ void table_connection_t::stop(bool updateable, bool append)
272270
log_info("Analyzing table '{}'...", table().name());
273271
analyze_table(*m_db_connection, table().schema(), table().name());
274272

275-
log_info("All postprocessing on table '{}' done in {}.", table().name(),
276-
util::human_readable_duration(timer.stop()));
277-
278273
teardown();
279274
}
280275

@@ -319,3 +314,10 @@ void table_connection_t::delete_rows_with(osmium::item_type type, osmid_t id)
319314
}
320315
m_copy_mgr.delete_object(type_to_char(type)[0], id);
321316
}
317+
318+
void table_connection_t::task_wait()
319+
{
320+
auto const run_time = m_task_result.wait();
321+
log_info("All postprocessing on table '{}' done in {}.", table().name(),
322+
util::human_readable_duration(run_time));
323+
}

src/flex-table.hpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "flex-table-column.hpp"
1515
#include "osmium-builder.hpp"
1616
#include "pgsql.hpp"
17+
#include "thread-pool.hpp"
1718

1819
#include <osmium/osm/item_type.hpp>
1920

@@ -255,6 +256,13 @@ class table_connection_t
255256

256257
geom::osmium_builder_t *get_builder() { return &m_builder; }
257258

259+
void task_set(std::future<std::chrono::milliseconds> &&future)
260+
{
261+
m_task_result.set(std::move(future));
262+
}
263+
264+
void task_wait();
265+
258266
private:
259267
geom::osmium_builder_t m_builder;
260268

@@ -271,6 +279,8 @@ class table_connection_t
271279
/// The connection to the database server.
272280
std::unique_ptr<pg_conn_t> m_db_connection;
273281

282+
task_result_t m_task_result;
283+
274284
/// Has the Id index already been created?
275285
bool m_id_index_created = false;
276286

src/logging.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
#include "logging.hpp"
1111

12+
thread_local unsigned int this_thread_num = 0;
13+
1214
/// Global logger singleton
1315
logger the_logger{};
1416

src/logging.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
#include <cstdio>
2121
#include <utility>
2222

23+
extern thread_local unsigned int this_thread_num;
24+
2325
enum class log_level
2426
{
2527
debug = 1,
@@ -49,6 +51,10 @@ class logger
4951
std::string str = fmt::format("{:%Y-%m-%d %H:%M:%S} ",
5052
fmt::localtime(std::time(nullptr)));
5153

54+
if (m_current_level == log_level::debug) {
55+
str += fmt::format(ts, "[{}] ", this_thread_num);
56+
}
57+
5258
if (prefix) {
5359
str += fmt::format(ts, "{}: ", prefix);
5460
}

src/middle-pgsql.cpp

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -98,17 +98,12 @@ void middle_pgsql_t::table_desc::build_index(std::string const &conninfo) const
9898
return;
9999
}
100100

101-
util::timer_t timer;
102-
103101
// Use a temporary connection here because we might run in a separate
104102
// thread context.
105103
pg_conn_t db_connection{conninfo};
106104

107105
log_info("Building index on table '{}'", name());
108106
db_connection.exec(m_create_fw_dep_indexes);
109-
110-
log_info("Done postprocessing on table '{}' in {}", name(),
111-
util::human_readable_duration(timer.stop()));
112107
}
113108

114109
namespace {
@@ -644,13 +639,22 @@ void middle_pgsql_t::stop(thread_pool_t &pool)
644639
} else if (!m_options->append) {
645640
// Building the indexes takes time, so do it asynchronously.
646641
for (auto &table : m_tables) {
647-
pool.submit(std::bind(&middle_pgsql_t::table_desc::build_index,
648-
&table,
649-
m_options->database_options.conninfo()));
642+
table.task_set(pool.submit(
643+
std::bind(&middle_pgsql_t::table_desc::build_index, &table,
644+
m_options->database_options.conninfo())));
650645
}
651646
}
652647
}
653648

649+
void middle_pgsql_t::wait()
650+
{
651+
for (auto &table : m_tables) {
652+
auto const run_time = table.task_wait();
653+
log_info("Done postprocessing on table '{}' in {}", table.name(),
654+
util::human_readable_duration(run_time));
655+
}
656+
}
657+
654658
static table_sql sql_for_nodes(bool create_table) noexcept
655659
{
656660
table_sql sql{};

src/middle-pgsql.hpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ struct middle_pgsql_t : public middle_t
7474
void start() override;
7575
void stop(thread_pool_t &pool) override;
7676

77+
void wait() override;
78+
7779
void node(osmium::Node const &node) override;
7880
void way(osmium::Way const &way) override;
7981
void relation(osmium::Relation const &rel) override;
@@ -115,8 +117,16 @@ struct middle_pgsql_t : public middle_t
115117
std::string m_prepare_fw_dep_lookups;
116118
std::string m_create_fw_dep_indexes;
117119

120+
void task_set(std::future<std::chrono::milliseconds> &&future)
121+
{
122+
m_task_result.set(std::move(future));
123+
}
124+
125+
std::chrono::milliseconds task_wait() { return m_task_result.wait(); }
126+
118127
private:
119128
std::shared_ptr<db_target_descr_t> m_copy_target;
129+
task_result_t m_task_result;
120130
};
121131

122132
std::shared_ptr<middle_query_t> get_query_instance() override;

0 commit comments

Comments
 (0)