Skip to content

Commit 71fce29

Browse files
committed
middle-pgsql: move stop and end_copy into table desc
1 parent 41a0eab commit 71fce29

File tree

2 files changed

+69
-72
lines changed

2 files changed

+69
-72
lines changed

middle-pgsql.cpp

Lines changed: 63 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,56 @@ middle_pgsql_t::table_desc::table_desc(const char *name_,
6363
prepare_intarray(prepare_intarray_),
6464
copy(copy_),
6565
analyze(analyze_),
66-
stop(stop_),
66+
commit(stop_),
6767
array_indexes(array_indexes_),
6868
copyMode(0),
6969
transactionMode(0),
7070
sql_conn(nullptr)
7171
{}
7272

73+
void middle_pgsql_t::table_desc::end_copy()
74+
{
75+
// Terminate any pending COPY */
76+
if (copyMode) {
77+
int stop = PQputCopyEnd(sql_conn, nullptr);
78+
if (stop != 1) {
79+
fprintf(stderr, "COPY_END for %s failed: %s\n", copy, PQerrorMessage(sql_conn));
80+
util::exit_nicely();
81+
}
82+
83+
pg_result_t res(PQgetResult(sql_conn));
84+
if (PQresultStatus(res.get()) != PGRES_COMMAND_OK) {
85+
fprintf(stderr, "COPY_END for %s failed: %s\n", copy, PQerrorMessage(sql_conn));
86+
util::exit_nicely();
87+
}
88+
copyMode = 0;
89+
}
90+
}
91+
92+
void middle_pgsql_t::table_desc::stop(bool droptemp, bool build_indexes)
93+
{
94+
time_t start, end;
95+
96+
fprintf(stderr, "Stopping table: %s\n", name);
97+
98+
end_copy();
99+
100+
time(&start);
101+
if (droptemp) {
102+
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "DROP TABLE %s", name);
103+
} else if (build_indexes && array_indexes) {
104+
fprintf(stderr, "Building index on table: %s\n", name);
105+
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", array_indexes);
106+
}
107+
108+
PQfinish(sql_conn);
109+
sql_conn = nullptr;
110+
time(&end);
111+
112+
fprintf(stderr, "Stopped table: %s in %is\n", name, (int)(end - start));
113+
}
114+
115+
73116
namespace {
74117
// Decodes a portion of an array literal from postgres */
75118
// Argument should point to beginning of literal, on return points to delimiter */
@@ -160,26 +203,6 @@ void pgsql_parse_nodes(const char *string, osmium::memory::Buffer &buffer,
160203
}
161204
}
162205

163-
int pgsql_endCopy(middle_pgsql_t::table_desc *table)
164-
{
165-
// Terminate any pending COPY */
166-
if (table->copyMode) {
167-
PGconn *sql_conn = table->sql_conn;
168-
int stop = PQputCopyEnd(sql_conn, nullptr);
169-
if (stop != 1) {
170-
fprintf(stderr, "COPY_END for %s failed: %s\n", table->copy, PQerrorMessage(sql_conn));
171-
util::exit_nicely();
172-
}
173-
174-
pg_result_t res(PQgetResult(sql_conn));
175-
if (PQresultStatus(res.get()) != PGRES_COMMAND_OK) {
176-
fprintf(stderr, "COPY_END for %s failed: %s\n", table->copy, PQerrorMessage(sql_conn));
177-
util::exit_nicely();
178-
}
179-
table->copyMode = 0;
180-
}
181-
return 0;
182-
}
183206
} // anonymous namespace
184207

185208

@@ -366,7 +389,7 @@ void middle_pgsql_t::local_nodes_delete(osmid_t osm_id)
366389
char const *paramValues[1];
367390
char buffer[64];
368391
// Make sure we're out of copy mode */
369-
pgsql_endCopy(&tables[NODE_TABLE]);
392+
tables[NODE_TABLE].end_copy();
370393

371394
sprintf( buffer, "%" PRIdOSMID, osm_id );
372395
paramValues[0] = buffer;
@@ -392,8 +415,8 @@ void middle_pgsql_t::node_changed(osmid_t osm_id)
392415
char const *paramValues[1];
393416
char buffer[64];
394417
// Make sure we're out of copy mode */
395-
pgsql_endCopy(&tables[WAY_TABLE]);
396-
pgsql_endCopy(&tables[REL_TABLE]);
418+
tables[WAY_TABLE].end_copy();
419+
tables[REL_TABLE].end_copy();
397420

398421
sprintf( buffer, "%" PRIdOSMID, osm_id );
399422
paramValues[0] = buffer;
@@ -570,7 +593,7 @@ void middle_pgsql_t::ways_delete(osmid_t osm_id)
570593
char const *paramValues[1];
571594
char buffer[64];
572595
// Make sure we're out of copy mode */
573-
pgsql_endCopy(&tables[WAY_TABLE]);
596+
tables[WAY_TABLE].end_copy();
574597

575598
sprintf( buffer, "%" PRIdOSMID, osm_id );
576599
paramValues[0] = buffer;
@@ -582,7 +605,7 @@ void middle_pgsql_t::iterate_ways(middle_t::pending_processor& pf)
582605
{
583606

584607
// Make sure we're out of copy mode */
585-
pgsql_endCopy(&tables[WAY_TABLE]);
608+
tables[WAY_TABLE].end_copy();
586609

587610
// enqueue the jobs
588611
osmid_t id;
@@ -602,7 +625,7 @@ void middle_pgsql_t::way_changed(osmid_t osm_id)
602625
char const *paramValues[1];
603626
char buffer[64];
604627
// Make sure we're out of copy mode */
605-
pgsql_endCopy(&tables[REL_TABLE]);
628+
tables[REL_TABLE].end_copy();
606629

607630
sprintf( buffer, "%" PRIdOSMID, osm_id );
608631
paramValues[0] = buffer;
@@ -735,8 +758,8 @@ void middle_pgsql_t::relations_delete(osmid_t osm_id)
735758
char const *paramValues[1];
736759
char buffer[64];
737760
// Make sure we're out of copy mode */
738-
pgsql_endCopy(&tables[WAY_TABLE]);
739-
pgsql_endCopy(&tables[REL_TABLE]);
761+
tables[WAY_TABLE].end_copy();
762+
tables[REL_TABLE].end_copy();
740763

741764
sprintf( buffer, "%" PRIdOSMID, osm_id );
742765
paramValues[0] = buffer;
@@ -758,7 +781,7 @@ void middle_pgsql_t::relations_delete(osmid_t osm_id)
758781
void middle_pgsql_t::iterate_relations(pending_processor& pf)
759782
{
760783
// Make sure we're out of copy mode */
761-
pgsql_endCopy(&tables[REL_TABLE]);
784+
tables[REL_TABLE].end_copy();
762785

763786
// enqueue the jobs
764787
osmid_t id;
@@ -778,7 +801,7 @@ void middle_pgsql_t::relation_changed(osmid_t osm_id)
778801
char const *paramValues[1];
779802
char buffer[64];
780803
// Make sure we're out of copy mode */
781-
pgsql_endCopy(&tables[REL_TABLE]);
804+
tables[REL_TABLE].end_copy();
782805

783806
sprintf( buffer, "%" PRIdOSMID, osm_id );
784807
paramValues[0] = buffer;
@@ -927,7 +950,7 @@ void middle_pgsql_t::connect(table_desc& table) {
927950
set_prefix_and_tbls(out_options, &(table.prepare_intarray));
928951
set_prefix_and_tbls(out_options, &(table.copy));
929952
set_prefix_and_tbls(out_options, &(table.analyze));
930-
set_prefix_and_tbls(out_options, &(table.stop));
953+
set_prefix_and_tbls(out_options, &(table.commit));
931954
set_prefix_and_tbls(out_options, &(table.array_indexes));
932955

933956
fprintf(stderr, "Setting up table: %s\n", table.name);
@@ -961,9 +984,6 @@ void middle_pgsql_t::start(const options_t *out_options_)
961984
}
962985

963986
append = out_options->append;
964-
// reset this on every start to avoid options from last run
965-
// staying set for the second.
966-
build_indexes = !append && !out_options->droptemp;
967987

968988
cache.reset(new node_ram_cache(out_options->alloc_chunkwise | ALLOC_LOSSY,
969989
out_options->cache));
@@ -1031,9 +1051,9 @@ void middle_pgsql_t::start(const options_t *out_options_)
10311051
void middle_pgsql_t::commit(void) {
10321052
for (auto& table: tables) {
10331053
PGconn *sql_conn = table.sql_conn;
1034-
pgsql_endCopy(&table);
1035-
if (table.stop && table.transactionMode) {
1036-
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", table.stop);
1054+
table.end_copy();
1055+
if (table.commit && table.transactionMode) {
1056+
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", table.commit);
10371057
table.transactionMode = 0;
10381058
}
10391059
}
@@ -1042,35 +1062,10 @@ void middle_pgsql_t::commit(void) {
10421062
void middle_pgsql_t::flush()
10431063
{
10441064
for (auto &table : tables) {
1045-
pgsql_endCopy(&table);
1065+
table.end_copy();
10461066
}
10471067
}
10481068

1049-
void middle_pgsql_t::pgsql_stop_one(table_desc *table)
1050-
{
1051-
time_t start, end;
1052-
1053-
PGconn *sql_conn = table->sql_conn;
1054-
1055-
fprintf(stderr, "Stopping table: %s\n", table->name);
1056-
pgsql_endCopy(table);
1057-
time(&start);
1058-
if (out_options->droptemp)
1059-
{
1060-
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "DROP TABLE %s", table->name);
1061-
}
1062-
else if (build_indexes && table->array_indexes)
1063-
{
1064-
fprintf(stderr, "Building index on table: %s\n", table->name);
1065-
pgsql_exec(sql_conn, PGRES_COMMAND_OK, "%s", table->array_indexes);
1066-
}
1067-
1068-
PQfinish(sql_conn);
1069-
table->sql_conn = nullptr;
1070-
time(&end);
1071-
fprintf(stderr, "Stopped table: %s in %is\n", table->name, (int)(end - start));
1072-
}
1073-
10741069
void middle_pgsql_t::stop(osmium::thread::Pool &pool)
10751070
{
10761071
cache.reset();
@@ -1082,17 +1077,18 @@ void middle_pgsql_t::stop(osmium::thread::Pool &pool)
10821077
// Dropping the tables is fast, so do it synchronously to guarantee
10831078
// that the space is freed before creating the other indices.
10841079
for (auto &t : tables) {
1085-
pgsql_stop_one(&t);
1080+
t.stop(out_options->droptemp, !append);
10861081
}
10871082
} else {
10881083
for (auto &t : tables) {
1089-
pool.submit(std::bind(&middle_pgsql_t::pgsql_stop_one, this, &t));
1084+
pool.submit(std::bind(&middle_pgsql_t::table_desc::stop,
1085+
&t, out_options->droptemp, !append));
10901086
}
10911087
}
10921088
}
10931089

10941090
middle_pgsql_t::middle_pgsql_t()
1095-
: append(false), mark_pending(true), build_indexes(true)
1091+
: append(false), mark_pending(true)
10961092
{
10971093
// clang-format off
10981094
/*table = t_node,*/

middle-pgsql.hpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ struct middle_pgsql_t : public slim_middle_t {
5151

5252
idlist_t relations_using_way(osmid_t way_id) const override;
5353

54-
struct table_desc {
54+
struct table_desc
55+
{
5556
table_desc(const char *name_ = NULL,
5657
const char *start_ = NULL,
5758
const char *create_ = NULL,
@@ -71,12 +72,15 @@ struct middle_pgsql_t : public slim_middle_t {
7172
const char *prepare_intarray;
7273
const char *copy;
7374
const char *analyze;
74-
const char *stop;
75+
const char *commit;
7576
const char *array_indexes;
7677

7778
int copyMode; /* True if we are in copy mode */
7879
int transactionMode; /* True if we are in an extended transaction */
7980
struct pg_conn *sql_conn;
81+
82+
void end_copy();
83+
void stop(bool droptemp, bool build_indexes);
8084
};
8185

8286
std::shared_ptr<middle_query_t>
@@ -91,8 +95,6 @@ struct middle_pgsql_t : public slim_middle_t {
9195
NUM_TABLES
9296
};
9397

94-
void pgsql_stop_one(table_desc *table);
95-
9698
/**
9799
* Sets up sql_conn for the table
98100
*/
@@ -116,7 +118,6 @@ struct middle_pgsql_t : public slim_middle_t {
116118

117119
void buffer_correct_params(char const **param, size_t size);
118120

119-
bool build_indexes;
120121
std::string copy_buffer;
121122
};
122123

0 commit comments

Comments
 (0)