Skip to content

Commit 39665dc

Browse files
committed
split middle-pgsql into insert and query class
The query instance can work on a single connection. This saves a couple of connections in the processing stage. Requires slightly changing the behviour of the connections that copy in the data. They need to commit, so that the data becomes visible. middle also needs to start the tables earlier, because the tables must have been created when preparing queries in the query connection.
1 parent 5af4f03 commit 39665dc

18 files changed

+226
-164
lines changed

middle-pgsql.cpp

Lines changed: 108 additions & 73 deletions
Large diffs are not rendered by default.

middle-pgsql.hpp

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,54 @@
1717
#include "node-ram-cache.hpp"
1818
#include "pgsql.hpp"
1919

20-
struct middle_pgsql_t : public slim_middle_t, public middle_query_t
20+
class middle_query_pgsql_t : public middle_query_t
2121
{
22-
middle_pgsql_t();
22+
public:
23+
middle_query_pgsql_t(
24+
char const *conninfo, std::shared_ptr<node_ram_cache> const &cache,
25+
std::shared_ptr<node_persistent_cache> const &persistent_cache);
26+
~middle_query_pgsql_t();
2327

24-
void start(const options_t *out_options_) override;
28+
size_t nodes_get_list(osmium::WayNodeList *nodes) const override;
29+
30+
bool ways_get(osmid_t id, osmium::memory::Buffer &buffer) const override;
31+
size_t rel_way_members_get(osmium::Relation const &rel, rolelist_t *roles,
32+
osmium::memory::Buffer &buffer) const override;
33+
34+
idlist_t relations_using_way(osmid_t way_id) const override;
35+
bool relations_get(osmid_t id,
36+
osmium::memory::Buffer &buffer) const override;
37+
38+
void exec_sql(std::string const &sql_cmd) const;
39+
40+
private:
41+
size_t local_nodes_get_list(osmium::WayNodeList *nodes) const;
42+
43+
pg_result_t exec_prepared(char const *stmt, char const *param) const;
44+
pg_result_t exec_prepared(char const *stmt, osmid_t osm_id) const;
45+
46+
struct pg_conn *m_sql_conn;
47+
std::shared_ptr<node_ram_cache> m_cache;
48+
std::shared_ptr<node_persistent_cache> m_persistent_cache;
49+
};
50+
51+
struct middle_pgsql_t : public slim_middle_t
52+
{
53+
middle_pgsql_t(options_t const *options);
54+
55+
void start() override;
2556
void stop(osmium::thread::Pool &pool) override;
2657
void analyze() override;
2758
void commit() override;
2859

2960
void nodes_set(osmium::Node const &node) override;
30-
size_t nodes_get_list(osmium::WayNodeList *nodes) const override;
3161
void nodes_delete(osmid_t id) override;
3262
void node_changed(osmid_t id) override;
3363

3464
void ways_set(osmium::Way const &way) override;
35-
bool ways_get(osmid_t id, osmium::memory::Buffer &buffer) const override;
36-
size_t rel_way_members_get(osmium::Relation const &rel, rolelist_t *roles,
37-
osmium::memory::Buffer &buffer) const override;
38-
3965
void ways_delete(osmid_t id) override;
4066
void way_changed(osmid_t id) override;
4167

42-
bool relations_get(osmid_t id, osmium::memory::Buffer &buffer) const override;
4368
void relations_set(osmium::Relation const &rel) override;
4469
void relations_delete(osmid_t id) override;
4570
void relation_changed(osmid_t id) override;
@@ -51,13 +76,12 @@ struct middle_pgsql_t : public slim_middle_t, public middle_query_t
5176

5277
size_t pending_count() const override;
5378

54-
idlist_t relations_using_way(osmid_t way_id) const override;
55-
5679
class table_desc
5780
{
5881
public:
5982
table_desc() : sql_conn(nullptr) {}
60-
table_desc(char const *name, char const *create,
83+
table_desc(options_t const *options, char const *name,
84+
char const *create, char const *prepare_query,
6185
char const *prepare = "", char const *prepare_intarray = "",
6286
char const *array_indexes = "");
6387

@@ -68,8 +92,9 @@ struct middle_pgsql_t : public slim_middle_t, public middle_query_t
6892

6993
int copyMode; /* True if we are in copy mode */
7094
struct pg_conn *sql_conn;
95+
std::string m_prepare_query;
7196

72-
void connect(options_t const *options);
97+
void connect(char const *conninfo);
7398
void begin();
7499
void prepare_queries(bool append);
75100
void create();
@@ -109,7 +134,6 @@ struct middle_pgsql_t : public slim_middle_t, public middle_query_t
109134
* Sets up sql_conn for the table
110135
*/
111136
void local_nodes_set(osmium::Node const &node);
112-
size_t local_nodes_get_list(osmium::WayNodeList *nodes) const;
113137
void local_nodes_delete(osmid_t osm_id);
114138

115139
table_desc tables[NUM_TABLES];

middle-ram.cpp

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,7 @@ void middle_ram_t::analyze(void)
155155
/* No need */
156156
}
157157

158-
void middle_ram_t::start(const options_t *options)
159-
{
160-
extra_attributes = options->extra_attributes;
161-
cache.reset(new node_ram_cache(options->alloc_chunkwise, options->cache));
162-
}
158+
void middle_ram_t::start() {}
163159

164160
void middle_ram_t::stop(osmium::thread::Pool &pool)
165161
{
@@ -172,8 +168,10 @@ void middle_ram_t::stop(osmium::thread::Pool &pool)
172168
void middle_ram_t::commit(void) {
173169
}
174170

175-
middle_ram_t::middle_ram_t():
176-
ways(), rels(), cache(), simulate_ways_deleted(false)
171+
middle_ram_t::middle_ram_t(options_t const *options)
172+
: ways(), rels(),
173+
cache(new node_ram_cache(options->alloc_chunkwise, options->cache)),
174+
extra_attributes(options->extra_attributes), simulate_ways_deleted(false)
177175
{
178176
}
179177

middle-ram.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,10 @@ class elem_cache_t
8282

8383
struct middle_ram_t : public middle_t, public middle_query_t
8484
{
85-
middle_ram_t();
85+
middle_ram_t(options_t const *options);
8686
virtual ~middle_ram_t();
8787

88-
void start(const options_t *options) override;
88+
void start() override;
8989
void stop(osmium::thread::Pool &pool) override;
9090
void analyze(void) override;
9191
void commit(void) override;

middle.hpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
#include "osmtypes.hpp"
1818
#include "reprojection.hpp"
1919

20-
struct options_t;
21-
2220
/**
2321
* Infterface for returning information about raw OSM input data from a cache.
2422
*/
@@ -84,7 +82,7 @@ struct middle_t
8482
{
8583
virtual ~middle_t() = 0;
8684

87-
virtual void start(options_t const *out_options) = 0;
85+
virtual void start() = 0;
8886
virtual void stop(osmium::thread::Pool &pool) = 0;
8987
virtual void analyze(void) = 0;
9088
virtual void commit(void) = 0;

osm2pgsql.cpp

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,17 @@ int main(int argc, char *argv[])
5656

5757
//setup the middle and backend (output)
5858
std::shared_ptr<middle_t> middle;
59-
std::vector<std::shared_ptr<output_t>> outputs;
6059

6160
if (options.slim) {
62-
auto mid = std::make_shared<middle_pgsql_t>();
63-
outputs = output_t::create_outputs(
64-
std::static_pointer_cast<middle_query_t>(mid), options);
65-
middle = std::static_pointer_cast<middle_t>(mid);
61+
middle = std::shared_ptr<middle_t>(new middle_pgsql_t(&options));
6662
} else {
67-
auto mid = std::make_shared<middle_ram_t>();
68-
outputs = output_t::create_outputs(
69-
std::static_pointer_cast<middle_query_t>(mid), options);
70-
middle = std::static_pointer_cast<middle_t>(mid);
63+
middle = std::shared_ptr<middle_t>(new middle_ram_t(&options));
7164
}
7265

66+
middle->start();
67+
68+
auto outputs = output_t::create_outputs(
69+
middle->get_query_instance(middle), options);
7370
//let osmdata orchestrate between the middle and the outs
7471
osmdata_t osmdata(middle, outputs);
7572

osmdata.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,6 @@ void osmdata_t::start() {
171171
for (auto& out: outs) {
172172
out->start();
173173
}
174-
mid->start(outs[0]->get_options());
175174
}
176175

177176
void osmdata_t::type_changed(osmium::item_type new_type)

tests/common.hpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@ namespace testing {
2222
char const *file_format)
2323
{
2424
//setup the middle
25-
auto middle = std::make_shared<MID>();
25+
std::shared_ptr<middle_t> middle(new MID(&options));
26+
middle->start();
2627

2728
//setup the backend (output)
2829
auto outputs = output_t::create_outputs(
29-
std::static_pointer_cast<middle_query_t>(middle), options);
30+
middle->get_query_instance(middle), options);
3031

3132
//let osmdata orchestrate between the middle and the outs
32-
osmdata_t osmdata(std::static_pointer_cast<middle_t>(middle), outputs);
33+
osmdata_t osmdata(middle, outputs);
3334

3435
parse(test_file, file_format, options, &osmdata);
3536
}

tests/middle-tests.hpp

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,24 +36,24 @@ class test_middle_helper
3636

3737
public:
3838
test_middle_helper(options_t const &options)
39-
: m_mid(std::make_shared<MID>()), m_output(m_mid, options),
39+
: m_mid(std::make_shared<MID>(&options)),
4040
m_buffer(4096, osmium::memory::Buffer::auto_grow::yes),
4141
m_proj(reprojection::create_projection(PROJ_LATLONG))
4242
{
43-
m_mid->start(&options);
43+
m_mid->start();
44+
m_mid_q = m_mid->get_query_instance(m_mid);
45+
m_output.reset(new output_null_t(m_mid_q, options));
4446
}
4547

46-
~test_middle_helper() { commit_and_stop(); }
47-
48-
void start(options_t const *options) { m_mid->start(options); }
49-
50-
void commit_and_stop()
48+
~test_middle_helper()
5149
{
5250
osmium::thread::Pool pool(1);
5351
m_mid->commit();
5452
m_mid->stop(pool);
5553
}
5654

55+
void commit() { m_mid->commit(); }
56+
5757
// tests that a single node can be set and retrieved. returns 0 on success.
5858
int test_node_set()
5959
{
@@ -65,9 +65,10 @@ class test_middle_helper
6565

6666
// set the node
6767
m_mid->nodes_set(node);
68+
m_mid->flush(osmium::item_type::way);
6869

6970
// get it back
70-
if (m_mid->nodes_get_list(&(way.nodes())) != way.nodes().size()) {
71+
if (m_mid_q->nodes_get_list(&(way.nodes())) != way.nodes().size()) {
7172
std::cerr << "ERROR: Unable to get node list.\n";
7273
return 1;
7374
}
@@ -128,9 +129,11 @@ class test_middle_helper
128129
ids.push_back(node.id());
129130
}
130131

132+
m_mid->flush(osmium::item_type::way);
133+
131134
auto &way = m_buffer.get<osmium::Way>(way_with_nodes(ids));
132135

133-
if (m_mid->nodes_get_list(&(way.nodes())) != ids.size()) {
136+
if (m_mid_q->nodes_get_list(&(way.nodes())) != ids.size()) {
134137
std::cerr << "ERROR: Unable to get node list.\n";
135138
return 1;
136139
}
@@ -191,7 +194,7 @@ class test_middle_helper
191194

192195
auto buf_pos = m_buffer.committed();
193196
rolelist_t roles;
194-
size_t way_count = m_mid->rel_way_members_get(rel, &roles, m_buffer);
197+
size_t way_count = m_mid_q->rel_way_members_get(rel, &roles, m_buffer);
195198
if (way_count != 1) {
196199
std::cerr << "ERROR: Unable to get way list.\n";
197200
return 1;
@@ -222,7 +225,7 @@ class test_middle_helper
222225
<< ", but got back " << way.id() << " from middle.\n";
223226
return 1;
224227
}
225-
m_mid->nodes_get_list(&(way.nodes()));
228+
m_mid_q->nodes_get_list(&(way.nodes()));
226229
for (size_t i = 0; i < nds.size(); ++i) {
227230
if (way.nodes()[i].location().lon() != lon) {
228231
std::cerr << "ERROR: Way node should have lon=" << lon
@@ -309,7 +312,8 @@ class test_middle_helper
309312
static constexpr double test_lat(osmid_t id) { return 1 + 1e-5 * id; }
310313

311314
std::shared_ptr<MID> m_mid;
312-
output_null_t m_output;
315+
std::shared_ptr<middle_query_t> m_mid_q;
316+
std::unique_ptr<output_null_t> m_output;
313317

314318
// simple osmium buffer to store all the objects in
315319
osmium::memory::Buffer m_buffer;

tests/mockups.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
struct dummy_middle_t : public middle_t {
88
virtual ~dummy_middle_t() = default;
99

10-
void start(const options_t *) override { }
10+
void start() override {}
1111
void stop(osmium::thread::Pool &) override {}
1212
void flush(osmium::item_type) override {}
1313
void cleanup(void) { }
@@ -26,7 +26,7 @@ struct dummy_middle_t : public middle_t {
2626
virtual size_t pending_count() const override { return 0; }
2727

2828
std::shared_ptr<middle_query_t>
29-
get_query_instance(std::shared_ptr<middle_t> const &mid) const override
29+
get_query_instance(std::shared_ptr<middle_t> const &) const override
3030
{
3131
return std::shared_ptr<middle_query_t>();
3232
}
@@ -35,7 +35,7 @@ struct dummy_middle_t : public middle_t {
3535
struct dummy_slim_middle_t : public slim_middle_t {
3636
virtual ~dummy_slim_middle_t() = default;
3737

38-
void start(const options_t *) override { }
38+
void start() override {}
3939
void stop(osmium::thread::Pool &) override {}
4040
void flush(osmium::item_type) override {}
4141
void cleanup(void) { }
@@ -54,7 +54,7 @@ struct dummy_slim_middle_t : public slim_middle_t {
5454
size_t pending_count() const override { return 0; }
5555

5656
std::shared_ptr<middle_query_t>
57-
get_query_instance(std::shared_ptr<middle_t> const &mid) const override
57+
get_query_instance(std::shared_ptr<middle_t> const &) const override
5858
{
5959
return std::shared_ptr<middle_query_t>();
6060
}

0 commit comments

Comments
 (0)