Skip to content

Commit 2b59000

Browse files
committed
middle-pgsql: move connect and destruct to table_desc
1 parent 38fe8cd commit 2b59000

File tree

2 files changed

+139
-139
lines changed

2 files changed

+139
-139
lines changed

middle-pgsql.cpp

Lines changed: 136 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,94 @@ enum table_id {
4545
t_node, t_way, t_rel
4646
} ;
4747

48+
/**
49+
* Helper to create SQL queries.
50+
*
51+
* The input string is mangled as follows:
52+
* %p replaced by the content of the "prefix" option
53+
* %i replaced by the content of the "tblsslim_data" option
54+
* %t replaced by the content of the "tblssslim_index" option
55+
* %m replaced by "UNLOGGED" if the "unlogged" option is set
56+
* other occurrences of the "%" char are treated normally.
57+
* any occurrence of { or } will be ignored (not copied to output string);
58+
* anything inside {} is only copied if it contained at least one of
59+
* %p, %i, %t, %m that was not NULL.
60+
*
61+
* So, the input string
62+
* Hello{ dear %i}!
63+
* will, if i is set to "John", translate to
64+
* Hello dear John!
65+
* but if i is unset, translate to
66+
* Hello!
67+
*
68+
* This is used for constructing SQL queries with proper tablespace settings.
69+
*/
70+
static void set_prefix_and_tbls(options_t const *options, char const **string)
71+
{
72+
char buffer[1024];
73+
const char *source;
74+
char *dest;
75+
char *openbrace = nullptr;
76+
int copied = 0;
77+
78+
if (*string == nullptr) {
79+
return;
80+
}
81+
source = *string;
82+
dest = buffer;
83+
84+
while (*source) {
85+
if (*source == '{') {
86+
openbrace = dest;
87+
copied = 0;
88+
source++;
89+
continue;
90+
} else if (*source == '}') {
91+
if (!copied && openbrace)
92+
dest = openbrace;
93+
source++;
94+
continue;
95+
} else if (*source == '%') {
96+
if (*(source + 1) == 'p') {
97+
if (!options->prefix.empty()) {
98+
strcpy(dest, options->prefix.c_str());
99+
dest += strlen(options->prefix.c_str());
100+
copied = 1;
101+
}
102+
source += 2;
103+
continue;
104+
} else if (*(source + 1) == 't') {
105+
if (options->tblsslim_data) {
106+
strcpy(dest, options->tblsslim_data->c_str());
107+
dest += strlen(options->tblsslim_data->c_str());
108+
copied = 1;
109+
}
110+
source += 2;
111+
continue;
112+
} else if (*(source + 1) == 'i') {
113+
if (options->tblsslim_index) {
114+
strcpy(dest, options->tblsslim_index->c_str());
115+
dest += strlen(options->tblsslim_index->c_str());
116+
copied = 1;
117+
}
118+
source += 2;
119+
continue;
120+
} else if (*(source + 1) == 'm') {
121+
if (options->unlogged) {
122+
strcpy(dest, "UNLOGGED");
123+
dest += 8;
124+
copied = 1;
125+
}
126+
source += 2;
127+
continue;
128+
}
129+
}
130+
*(dest++) = *(source++);
131+
}
132+
*dest = 0;
133+
*string = strdup(buffer);
134+
}
135+
48136
middle_pgsql_t::table_desc::table_desc(const char *name_, const char *create_,
49137
const char *create_index_,
50138
const char *prepare_,
@@ -55,6 +143,44 @@ middle_pgsql_t::table_desc::table_desc(const char *name_, const char *create_,
55143
copyMode(0), sql_conn(nullptr), transactionMode(0)
56144
{}
57145

146+
void middle_pgsql_t::table_desc::connect(options_t const *options)
147+
{
148+
set_prefix_and_tbls(options, &name);
149+
set_prefix_and_tbls(options, &create);
150+
set_prefix_and_tbls(options, &create_index);
151+
set_prefix_and_tbls(options, &prepare);
152+
set_prefix_and_tbls(options, &prepare_intarray);
153+
set_prefix_and_tbls(options, &array_indexes);
154+
155+
fprintf(stderr, "Setting up table: %s\n", name);
156+
sql_conn = PQconnectdb(options->database_options.conninfo().c_str());
157+
158+
// Check to see that the backend connection was successfully made, and if not, exit */
159+
if (PQstatus(sql_conn) != CONNECTION_OK) {
160+
fprintf(stderr, "Connection to database failed: %s\n",
161+
PQerrorMessage(sql_conn));
162+
util::exit_nicely();
163+
}
164+
165+
/*
166+
*
167+
* To allow for parallelisation, the second phase (iterate_ways), cannot be run
168+
* in an extended transaction and each update statement is its own transaction.
169+
* Therefore commit rate of postgresql is very important to ensure high speed.
170+
* If fsync is enabled to ensure safe transactions, the commit rate can be very low.
171+
* To compensate for this, one can set the postgresql parameter synchronous_commit
172+
* to off. This means an update statement returns to the client as success before the
173+
* transaction is saved to disk via fsync, which in return allows to bunch up multiple
174+
* transactions into a single fsync. This may result in some data loss in the case of a
175+
* database crash. However, as we don't currently have the ability to restart a full osm2pgsql
176+
* import session anyway, this is fine. Diff imports are also not effected, as the next
177+
* diff import would simply deal with all pending ways that were not previously finished.
178+
* This parameter does not effect safety from data corruption on the back-end.
179+
*/
180+
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "SET synchronous_commit TO off;");
181+
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "SET client_min_messages = WARNING");
182+
}
183+
58184
void middle_pgsql_t::table_desc::begin_copy()
59185
{
60186
pgsql_exec(sql_conn, PGRES_COPY_IN, "COPY %s FROM STDIN", name);
@@ -121,6 +247,13 @@ void middle_pgsql_t::table_desc::commit()
121247
}
122248
}
123249

250+
middle_pgsql_t::table_desc::~table_desc()
251+
{
252+
if (sql_conn) {
253+
PQfinish(sql_conn);
254+
}
255+
}
256+
124257
namespace {
125258
// Decodes a portion of an array literal from postgres */
126259
// Argument should point to beginning of literal, on return points to delimiter */
@@ -856,114 +989,6 @@ void middle_pgsql_t::analyze()
856989
}
857990
}
858991

859-
/**
860-
* Helper to create SQL queries.
861-
*
862-
* The input string is mangled as follows:
863-
* %p replaced by the content of the "prefix" option
864-
* %i replaced by the content of the "tblsslim_data" option
865-
* %t replaced by the content of the "tblssslim_index" option
866-
* %m replaced by "UNLOGGED" if the "unlogged" option is set
867-
* other occurrences of the "%" char are treated normally.
868-
* any occurrence of { or } will be ignored (not copied to output string);
869-
* anything inside {} is only copied if it contained at least one of
870-
* %p, %i, %t, %m that was not NULL.
871-
*
872-
* So, the input string
873-
* Hello{ dear %i}!
874-
* will, if i is set to "John", translate to
875-
* Hello dear John!
876-
* but if i is unset, translate to
877-
* Hello!
878-
*
879-
* This is used for constructing SQL queries with proper tablespace settings.
880-
*/
881-
static void set_prefix_and_tbls(const struct options_t *options, const char **string)
882-
{
883-
char buffer[1024];
884-
const char *source;
885-
char *dest;
886-
char *openbrace = nullptr;
887-
int copied = 0;
888-
889-
if (*string == nullptr) {
890-
return;
891-
}
892-
source = *string;
893-
dest = buffer;
894-
895-
while (*source) {
896-
if (*source == '{') {
897-
openbrace = dest;
898-
copied = 0;
899-
source++;
900-
continue;
901-
} else if (*source == '}') {
902-
if (!copied && openbrace) dest = openbrace;
903-
source++;
904-
continue;
905-
} else if (*source == '%') {
906-
if (*(source+1) == 'p') {
907-
if (!options->prefix.empty()) {
908-
strcpy(dest, options->prefix.c_str());
909-
dest += strlen(options->prefix.c_str());
910-
copied = 1;
911-
}
912-
source+=2;
913-
continue;
914-
} else if (*(source+1) == 't') {
915-
if (options->tblsslim_data) {
916-
strcpy(dest, options->tblsslim_data->c_str());
917-
dest += strlen(options->tblsslim_data->c_str());
918-
copied = 1;
919-
}
920-
source+=2;
921-
continue;
922-
} else if (*(source+1) == 'i') {
923-
if (options->tblsslim_index) {
924-
strcpy(dest, options->tblsslim_index->c_str());
925-
dest += strlen(options->tblsslim_index->c_str());
926-
copied = 1;
927-
}
928-
source+=2;
929-
continue;
930-
} else if (*(source+1) == 'm') {
931-
if (options->unlogged) {
932-
strcpy(dest, "UNLOGGED");
933-
dest += 8;
934-
copied = 1;
935-
}
936-
source+=2;
937-
continue;
938-
}
939-
}
940-
*(dest++) = *(source++);
941-
}
942-
*dest = 0;
943-
*string = strdup(buffer);
944-
}
945-
946-
void middle_pgsql_t::connect(table_desc& table) {
947-
PGconn *sql_conn;
948-
949-
set_prefix_and_tbls(out_options, &(table.name));
950-
set_prefix_and_tbls(out_options, &(table.create));
951-
set_prefix_and_tbls(out_options, &(table.create_index));
952-
set_prefix_and_tbls(out_options, &(table.prepare));
953-
set_prefix_and_tbls(out_options, &(table.prepare_intarray));
954-
set_prefix_and_tbls(out_options, &(table.array_indexes));
955-
956-
fprintf(stderr, "Setting up table: %s\n", table.name);
957-
sql_conn = PQconnectdb(out_options->database_options.conninfo().c_str());
958-
959-
// Check to see that the backend connection was successfully made, and if not, exit */
960-
if (PQstatus(sql_conn) != CONNECTION_OK) {
961-
fprintf(stderr, "Connection to database failed: %s\n", PQerrorMessage(sql_conn));
962-
util::exit_nicely();
963-
}
964-
table.sql_conn = sql_conn;
965-
}
966-
967992
void middle_pgsql_t::start(const options_t *out_options_)
968993
{
969994
out_options = out_options_;
@@ -995,27 +1020,10 @@ void middle_pgsql_t::start(const options_t *out_options_)
9951020
fprintf(stderr, "Mid: pgsql, cache=%d\n", out_options->cache);
9961021

9971022
// We use a connection per table to enable the use of COPY */
998-
for (auto& table: tables) {
999-
connect(table);
1023+
for (auto &table : tables) {
1024+
table.connect(out_options);
10001025
PGconn* sql_conn = table.sql_conn;
10011026

1002-
/*
1003-
* To allow for parallelisation, the second phase (iterate_ways), cannot be run
1004-
* in an extended transaction and each update statement is its own transaction.
1005-
* Therefore commit rate of postgresql is very important to ensure high speed.
1006-
* If fsync is enabled to ensure safe transactions, the commit rate can be very low.
1007-
* To compensate for this, one can set the postgresql parameter synchronous_commit
1008-
* to off. This means an update statement returns to the client as success before the
1009-
* transaction is saved to disk via fsync, which in return allows to bunch up multiple
1010-
* transactions into a single fsync. This may result in some data loss in the case of a
1011-
* database crash. However, as we don't currently have the ability to restart a full osm2pgsql
1012-
* import session anyway, this is fine. Diff imports are also not effected, as the next
1013-
* diff import would simply deal with all pending ways that were not previously finished.
1014-
* This parameter does not effect safety from data corruption on the back-end.
1015-
*/
1016-
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "SET synchronous_commit TO off;");
1017-
1018-
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "SET client_min_messages = WARNING");
10191027
if (dropcreate) {
10201028
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "DROP TABLE IF EXISTS %s CASCADE", table.name);
10211029
}
@@ -1125,15 +1133,6 @@ middle_pgsql_t::middle_pgsql_t()
11251133
// clang-format on
11261134
}
11271135

1128-
middle_pgsql_t::~middle_pgsql_t() {
1129-
for (auto &table : tables) {
1130-
if (table.sql_conn) {
1131-
PQfinish(table.sql_conn);
1132-
}
1133-
}
1134-
1135-
}
1136-
11371136
std::shared_ptr<middle_query_t>
11381137
middle_pgsql_t::get_query_instance(std::shared_ptr<middle_t> const &from) const
11391138
{
@@ -1153,7 +1152,7 @@ middle_pgsql_t::get_query_instance(std::shared_ptr<middle_t> const &from) const
11531152

11541153
// We use a connection per table to enable the use of COPY
11551154
for (int i = 0; i < NUM_TABLES; i++) {
1156-
mid->connect(mid->tables[i]);
1155+
mid->tables[i].connect(mid->out_options);
11571156
PGconn* sql_conn = mid->tables[i].sql_conn;
11581157

11591158
if (mid->tables[i].prepare) {

middle-pgsql.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
struct middle_pgsql_t : public slim_middle_t {
1919
middle_pgsql_t();
20-
virtual ~middle_pgsql_t();
2120

2221
void start(const options_t *out_options_) override;
2322
void stop(osmium::thread::Pool &pool) override;
@@ -60,6 +59,8 @@ struct middle_pgsql_t : public slim_middle_t {
6059
const char *prepare_intarray_ = NULL,
6160
const char *array_indexes_ = NULL);
6261

62+
~table_desc();
63+
6364
const char *name;
6465
const char *create;
6566
const char *create_index;
@@ -70,6 +71,7 @@ struct middle_pgsql_t : public slim_middle_t {
7071
int copyMode; /* True if we are in copy mode */
7172
struct pg_conn *sql_conn;
7273

74+
void connect(options_t const *options);
7375
void begin();
7476
void begin_copy();
7577
void end_copy();
@@ -95,7 +97,6 @@ struct middle_pgsql_t : public slim_middle_t {
9597
/**
9698
* Sets up sql_conn for the table
9799
*/
98-
void connect(table_desc& table);
99100
void local_nodes_set(osmium::Node const &node);
100101
size_t local_nodes_get_list(osmium::WayNodeList *nodes) const;
101102
void local_nodes_delete(osmid_t osm_id);

0 commit comments

Comments
 (0)