Skip to content

Commit 54dbe2d

Browse files
committed
Open thread pool earlier and make it a member of middle_t and output_t
This way we can access it from everywhere from the start on. This is not used yet, but can be used to run processing asynchronously before we are getting to the stop() phase.
1 parent aca7fec commit 54dbe2d

22 files changed

+175
-112
lines changed

src/dependency-manager.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
#include <cassert>
1818
#include <memory>
1919

20-
struct middle_t;
20+
class middle_t;
2121

2222
/**
2323
* The job of the dependency manager is to keep track of the dependencies

src/middle-pgsql.cpp

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -622,7 +622,7 @@ void middle_pgsql_t::start()
622622
}
623623
}
624624

625-
void middle_pgsql_t::stop(thread_pool_t &pool)
625+
void middle_pgsql_t::stop()
626626
{
627627
m_cache.reset();
628628
if (!m_options->flat_node_file.empty()) {
@@ -638,7 +638,7 @@ void middle_pgsql_t::stop(thread_pool_t &pool)
638638
} else if (!m_options->append) {
639639
// Building the indexes takes time, so do it asynchronously.
640640
for (auto &table : m_tables) {
641-
table.task_set(pool.submit(
641+
table.task_set(thread_pool().submit(
642642
std::bind(&middle_pgsql_t::table_desc::build_index, &table,
643643
m_options->database_options.conninfo())));
644644
}
@@ -779,8 +779,9 @@ static bool check_bucket_index(pg_conn_t *db_connection,
779779
return res.num_tuples() > 0;
780780
}
781781

782-
middle_pgsql_t::middle_pgsql_t(options_t const *options)
783-
: m_options(options),
782+
middle_pgsql_t::middle_pgsql_t(std::shared_ptr<thread_pool_t> thread_pool,
783+
options_t const *options)
784+
: middle_t(std::move(thread_pool)), m_options(options),
784785
m_cache(new node_locations_t{static_cast<std::size_t>(options->cache) *
785786
1024UL * 1024UL}),
786787
m_db_connection(m_options->database_options.conninfo()),

src/middle-pgsql.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,11 @@ struct table_sql {
6969

7070
struct middle_pgsql_t : public middle_t
7171
{
72-
explicit middle_pgsql_t(options_t const *options);
72+
middle_pgsql_t(std::shared_ptr<thread_pool_t> thread_pool,
73+
options_t const *options);
7374

7475
void start() override;
75-
void stop(thread_pool_t &pool) override;
76+
void stop() override;
7677

7778
void wait() override;
7879

src/middle-ram.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
#include <limits>
2828
#include <memory>
2929

30-
middle_ram_t::middle_ram_t(options_t const *options)
30+
middle_ram_t::middle_ram_t(std::shared_ptr<thread_pool_t> thread_pool,
31+
options_t const *options)
32+
: middle_t(std::move(thread_pool))
3133
{
3234
assert(options);
3335

@@ -60,7 +62,7 @@ void middle_ram_t::set_requirements(output_requirements const &requirements)
6062
log_debug(" relations: {}", m_store_options.relations);
6163
}
6264

63-
void middle_ram_t::stop(thread_pool_t &)
65+
void middle_ram_t::stop()
6466
{
6567
auto const mbyte = 1024 * 1024;
6668

src/middle-ram.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,13 @@ class options_t;
4141
class middle_ram_t : public middle_t, public middle_query_t
4242
{
4343
public:
44-
explicit middle_ram_t(options_t const *options);
44+
middle_ram_t(std::shared_ptr<thread_pool_t> thread_pool,
45+
options_t const *options);
4546

4647
~middle_ram_t() noexcept override = default;
4748

4849
void start() override {}
49-
void stop(thread_pool_t &) override;
50+
void stop() override;
5051

5152
void node(osmium::Node const &node) override;
5253
void way(osmium::Way const &way) override;

src/middle.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@
1212
#include "middle.hpp"
1313
#include "options.hpp"
1414

15-
std::shared_ptr<middle_t> create_middle(options_t const &options)
15+
std::shared_ptr<middle_t>
16+
create_middle(std::shared_ptr<thread_pool_t> thread_pool,
17+
options_t const &options)
1618
{
1719
if (options.slim) {
18-
return std::make_shared<middle_pgsql_t>(&options);
20+
return std::make_shared<middle_pgsql_t>(std::move(thread_pool),
21+
&options);
1922
}
2023

21-
return std::make_shared<middle_ram_t>(&options);
24+
return std::make_shared<middle_ram_t>(std::move(thread_pool), &options);
2225
}
2326

src/middle.hpp

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,17 @@ inline middle_query_t::~middle_query_t() = default;
8181
* Interface for storing "raw" OSM data in an intermediate object store and
8282
* getting it back.
8383
*/
84-
struct middle_t
84+
class middle_t
8585
{
86+
public:
87+
explicit middle_t(std::shared_ptr<thread_pool_t> thread_pool)
88+
: m_thread_pool(std::move(thread_pool))
89+
{}
90+
8691
virtual ~middle_t() = 0;
8792

8893
virtual void start() = 0;
89-
virtual void stop(thread_pool_t &pool) = 0;
94+
virtual void stop() = 0;
9095

9196
virtual void wait() {}
9297

@@ -115,11 +120,23 @@ struct middle_t
115120
virtual std::shared_ptr<middle_query_t> get_query_instance() = 0;
116121

117122
virtual void set_requirements(output_requirements const &) {}
118-
};
123+
124+
protected:
125+
thread_pool_t &thread_pool() const noexcept
126+
{
127+
assert(m_thread_pool);
128+
return *m_thread_pool;
129+
}
130+
131+
private:
132+
std::shared_ptr<thread_pool_t> m_thread_pool;
133+
}; // class middle_t
119134

120135
inline middle_t::~middle_t() = default;
121136

122137
/// Factory function: Instantiate the middle based on the command line options.
123-
std::shared_ptr<middle_t> create_middle(options_t const &options);
138+
std::shared_ptr<middle_t>
139+
create_middle(std::shared_ptr<thread_pool_t> thread_pool,
140+
options_t const &options);
124141

125142
#endif // OSM2PGSQL_MIDDLE_HPP

src/osm2pgsql.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,15 @@ static void run(options_t const &options)
2929
auto const files = prepare_input_files(
3030
options.input_files, options.input_format, options.append);
3131

32-
auto middle = create_middle(options);
32+
auto thread_pool = std::make_shared<thread_pool_t>(
33+
options.parallel_indexing ? options.num_procs : 1U);
34+
log_debug("Started pool with {} threads.", thread_pool->num_threads());
35+
36+
auto middle = create_middle(thread_pool, options);
3337
middle->start();
3438

35-
auto output =
36-
output_t::create_output(middle->get_query_instance(), options);
39+
auto output = output_t::create_output(middle->get_query_instance(),
40+
thread_pool, options);
3741

3842
middle->set_requirements(output->get_requirements());
3943

src/osmdata.cpp

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ osmdata_t::osmdata_t(std::unique_ptr<dependency_manager_t> dependency_manager,
3333
m_output(std::move(output)), m_conninfo(options.database_options.conninfo()),
3434
m_bbox(options.bbox), m_num_procs(options.num_procs),
3535
m_append(options.append), m_droptemp(options.droptemp),
36-
m_parallel_indexing(options.parallel_indexing),
3736
m_with_extra_attrs(options.extra_attributes),
3837
m_with_forward_dependencies(options.with_forward_dependencies)
3938
{
@@ -373,27 +372,20 @@ void osmdata_t::reprocess_marked() const { m_output->reprocess_marked(); }
373372

374373
void osmdata_t::postprocess_database() const
375374
{
376-
unsigned int const num_threads = m_parallel_indexing ? m_num_procs : 1U;
377-
log_debug("Starting pool with {} threads.", num_threads);
378-
379-
// All the intensive parts of this are long-running PostgreSQL commands.
380-
// They will be run in a thread pool.
381-
thread_pool_t pool{num_threads};
382-
383375
if (m_droptemp) {
384376
// When dropping middle tables, make sure they are gone before
385377
// indexing starts.
386-
m_mid->stop(pool);
378+
m_mid->stop();
387379
}
388380

389-
m_output->stop(&pool);
381+
m_output->stop();
390382

391383
if (!m_droptemp) {
392384
// When keeping middle tables, there is quite a large index created
393385
// which is better done after the output tables have been copied.
394386
// Note that --disable-parallel-indexing needs to be used to really
395387
// force the order.
396-
m_mid->stop(pool);
388+
m_mid->stop();
397389
}
398390

399391
// Waiting here for pool to execute all tasks.

src/osmdata.hpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
#include "dependency-manager.hpp"
2828
#include "osmtypes.hpp"
2929

30+
class middle_t;
3031
class options_t;
3132
class output_t;
32-
struct middle_t;
3333

3434
/**
3535
* This class guides the processing of the OSM data through its multiple
@@ -101,7 +101,6 @@ class osmdata_t : public osmium::handler::Handler
101101
unsigned int m_num_procs;
102102
bool m_append;
103103
bool m_droptemp;
104-
bool m_parallel_indexing;
105104
bool m_with_extra_attrs;
106105
bool m_with_forward_dependencies;
107106
};

0 commit comments

Comments
 (0)