Skip to content

Commit 4eb7332

Browse files
authored
Merge pull request #1149 from joto/refactor-dependency-tracking-step2
Refactor dependency tracking step2
2 parents 74872fe + 768d94c commit 4eb7332

14 files changed

+133
-301
lines changed

src/middle-pgsql.cpp

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -321,14 +321,15 @@ void middle_pgsql_t::node_changed(osmid_t osm_id)
321321
return;
322322
}
323323

324-
//keep track of whatever ways and rels these nodes intersect
324+
// Find all ways referencing this node and mark them as pending.
325325
auto res = exec_prepared("mark_ways_by_node", osm_id);
326326
for (int i = 0; i < res.num_tuples(); ++i) {
327327
osmid_t const marked = osmium::string_to_object_id(res.get_value(i, 0));
328+
way_changed(marked);
328329
m_ways_pending_tracker->mark(marked);
329330
}
330331

331-
//do the rels too
332+
// Find all relations referencing this node and mark them as pending.
332333
res = exec_prepared("mark_rels_by_node", osm_id);
333334
for (int i = 0; i < res.num_tuples(); ++i) {
334335
osmid_t const marked = osmium::string_to_object_id(res.get_value(i, 0));
@@ -453,6 +454,11 @@ void middle_pgsql_t::iterate_ways(middle_t::pending_processor &pf)
453454
void middle_pgsql_t::way_changed(osmid_t osm_id)
454455
{
455456
assert(m_append);
457+
458+
if (m_ways_pending_tracker->is_marked(osm_id)) {
459+
return;
460+
}
461+
456462
//keep track of whatever rels this way intersects
457463
auto const res = exec_prepared("mark_rels_by_way", osm_id);
458464
for (int i = 0; i < res.num_tuples(); ++i) {
@@ -548,8 +554,6 @@ void middle_pgsql_t::iterate_relations(pending_processor &pf)
548554
while (id_tracker::is_valid(id = m_rels_pending_tracker->pop_mark())) {
549555
pf.enqueue_relations(id);
550556
}
551-
// in case we had higher ones than the middle
552-
pf.enqueue_relations(id_tracker::max());
553557

554558
//let the threads work on them
555559
pf.process_relations();
@@ -748,8 +752,9 @@ static table_sql sql_for_relations() noexcept
748752
" AND parts[way_off+1:rel_off] && ARRAY[$1];\n";
749753

750754
sql.prepare_mark = "PREPARE mark_rels_by_node(int8) AS"
751-
" SELECT id FROM {prefix}_ways"
752-
" WHERE nodes && ARRAY[$1];\n"
755+
" SELECT id FROM {prefix}_rels"
756+
" WHERE parts && ARRAY[$1]"
757+
" AND parts[1:way_off] && ARRAY[$1];\n"
753758
"PREPARE mark_rels_by_way(int8) AS"
754759
" SELECT id FROM {prefix}_rels"
755760
" WHERE parts && ARRAY[$1]"

src/osmdata.cpp

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ struct pending_threaded_processor : public middle_t::pending_processor
171171
using output_vec_t = std::vector<std::shared_ptr<output_t>>;
172172

173173
static void do_jobs(output_vec_t const &outputs, pending_queue_t &queue,
174-
size_t &ids_done, std::mutex &mutex, int append,
174+
size_t &ids_done, std::mutex &mutex, bool append,
175175
bool ways)
176176
{
177177
while (true) {
@@ -297,27 +297,20 @@ struct pending_threaded_processor : public middle_t::pending_processor
297297
ids_queued = 0;
298298
ids_done = 0;
299299

300-
//collect all the new rels that became pending from each
301-
//output in each thread back to their respective main outputs
302300
for (auto const &clone : clones) {
303-
//for each clone/original output
304-
for (output_vec_t::const_iterator original_output = outs.begin(),
305-
clone_output = clone.begin();
306-
original_output != outs.end() && clone_output != clone.end();
307-
++original_output, ++clone_output) {
308-
//done copying ways for now
309-
clone_output->get()->commit();
310-
//merge the pending from this threads copy of output back
311-
original_output->get()->merge_pending_relations(
312-
clone_output->get());
301+
for (auto const &clone_output : clone) {
302+
clone_output.get()->commit();
313303
}
314304
}
315305
}
316306

317307
void enqueue_relations(osmid_t id) override
318308
{
319309
for (size_t i = 0; i < outs.size(); ++i) {
320-
outs[i]->enqueue_relations(queue, id, i, ids_queued);
310+
if (outs[i]->need_forward_dependencies()) {
311+
queue.emplace(id, i);
312+
++ids_queued;
313+
}
321314
}
322315
}
323316

@@ -401,21 +394,6 @@ struct pending_threaded_processor : public middle_t::pending_processor
401394

402395
} // anonymous namespace
403396

404-
/**
405-
* Is there any pending work in the middle or one of the outputs?
406-
*/
407-
bool osmdata_t::has_pending() const noexcept
408-
{
409-
if (m_mid->has_pending()) {
410-
return true;
411-
}
412-
413-
return std::any_of(m_outs.cbegin(), m_outs.cend(),
414-
[](std::shared_ptr<output_t> const &out) {
415-
return out->has_pending();
416-
});
417-
}
418-
419397
void osmdata_t::stop() const
420398
{
421399
/* Commit the transactions, so that multiple processes can
@@ -432,7 +410,7 @@ void osmdata_t::stop() const
432410
auto const *opts = m_outs[0]->get_options();
433411

434412
// are there any objects left pending?
435-
if (has_pending()) {
413+
if (m_mid->has_pending()) {
436414
//threaded pending processing
437415
pending_threaded_processor ptp(m_mid, m_outs, opts->num_procs,
438416
opts->append);

src/osmdata.hpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ class osmdata_t
3434

3535
private:
3636
slim_middle_t &slim_middle() const noexcept;
37-
bool has_pending() const noexcept;
3837

3938
std::shared_ptr<middle_t> m_mid;
4039
std::vector<std::shared_ptr<output_t>> m_outs;

src/output-flex.cpp

Lines changed: 2 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1007,7 +1007,7 @@ void output_flex_t::call_process_function(int index,
10071007
}
10081008
}
10091009

1010-
void output_flex_t::pending_way(osmid_t id, int exists)
1010+
void output_flex_t::pending_way(osmid_t id, bool exists)
10111011
{
10121012
if (!m_has_process_way) {
10131013
return;
@@ -1020,10 +1020,6 @@ void output_flex_t::pending_way(osmid_t id, int exists)
10201020

10211021
if (exists) {
10221022
way_delete(id);
1023-
auto const rel_ids = m_mid->relations_using_way(id);
1024-
for (auto const rel_id : rel_ids) {
1025-
m_rels_pending_tracker.mark(rel_id);
1026-
}
10271023
}
10281024

10291025
auto &way = m_buffer.get<osmium::Way>(0);
@@ -1035,51 +1031,7 @@ void output_flex_t::pending_way(osmid_t id, int exists)
10351031
m_buffer.clear();
10361032
}
10371033

1038-
void output_flex_t::enqueue_relations(pending_queue_t &job_queue, osmid_t id,
1039-
std::size_t output_id, std::size_t &added)
1040-
{
1041-
if (!m_has_process_relation) {
1042-
return;
1043-
}
1044-
1045-
osmid_t const prev = m_rels_pending_tracker.last_returned();
1046-
if (id_tracker::is_valid(prev) && prev >= id) {
1047-
if (prev > id) {
1048-
job_queue.emplace(id, output_id);
1049-
}
1050-
// already done the job
1051-
return;
1052-
}
1053-
1054-
//make sure we get the one passed in
1055-
if (id_tracker::is_valid(id)) {
1056-
job_queue.emplace(id, output_id);
1057-
++added;
1058-
}
1059-
1060-
//grab the first one or bail if its not valid
1061-
osmid_t popped = m_rels_pending_tracker.pop_mark();
1062-
if (!id_tracker::is_valid(popped)) {
1063-
return;
1064-
}
1065-
1066-
//get all the ones up to the id that was passed in
1067-
while (popped < id) {
1068-
job_queue.emplace(popped, output_id);
1069-
++added;
1070-
popped = m_rels_pending_tracker.pop_mark();
1071-
}
1072-
1073-
//make sure to get this one as well and move to the next
1074-
if (popped > id) {
1075-
if (id_tracker::is_valid(popped)) {
1076-
job_queue.emplace(popped, output_id);
1077-
++added;
1078-
}
1079-
}
1080-
}
1081-
1082-
void output_flex_t::pending_relation(osmid_t id, int exists)
1034+
void output_flex_t::pending_relation(osmid_t id, bool exists)
10831035
{
10841036
if (!m_has_process_relation) {
10851037
return;
@@ -1373,11 +1325,6 @@ void output_flex_t::init_lua(std::string const &filename)
13731325
lua_remove(lua_state(), 1); // global "osm2pgsql"
13741326
}
13751327

1376-
bool output_flex_t::has_pending() const
1377-
{
1378-
return !m_rels_pending_tracker.empty();
1379-
}
1380-
13811328
void output_flex_t::stage2_proc()
13821329
{
13831330
bool const has_marked_ways = !m_stage2_ways_tracker->empty();
@@ -1447,18 +1394,6 @@ void output_flex_t::stage2_proc()
14471394
}
14481395
}
14491396

1450-
void output_flex_t::merge_pending_relations(output_t *other)
1451-
{
1452-
auto *opgsql = dynamic_cast<output_flex_t *>(other);
1453-
if (opgsql) {
1454-
osmid_t id;
1455-
while (id_tracker::is_valid(
1456-
(id = opgsql->m_rels_pending_tracker.pop_mark()))) {
1457-
m_rels_pending_tracker.mark(id);
1458-
}
1459-
}
1460-
}
1461-
14621397
void output_flex_t::merge_expire_trees(output_t *other)
14631398
{
14641399
auto *opgsql = dynamic_cast<output_flex_t *>(other);

src/output-flex.hpp

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,8 @@ class output_flex_t : public output_t
6161

6262
void stage2_proc() override;
6363

64-
void pending_way(osmid_t id, int exists) override;
65-
66-
void enqueue_relations(pending_queue_t &job_queue, osmid_t id,
67-
std::size_t output_id, std::size_t &added) override;
68-
void pending_relation(osmid_t id, int exists) override;
64+
void pending_way(osmid_t id, bool exists) override;
65+
void pending_relation(osmid_t id, bool exists) override;
6966

7067
void node_add(osmium::Node const &node) override;
7168
void way_add(osmium::Way *way) override;
@@ -79,9 +76,6 @@ class output_flex_t : public output_t
7976
void way_delete(osmid_t id) override;
8077
void relation_delete(osmid_t id) override;
8178

82-
bool has_pending() const override;
83-
84-
void merge_pending_relations(output_t *other) override;
8579
void merge_expire_trees(output_t *other) override;
8680

8781
int app_define_table();
@@ -137,8 +131,6 @@ class output_flex_t : public output_t
137131
std::shared_ptr<std::vector<flex_table_t>> m_tables;
138132
std::vector<table_connection_t> m_table_connections;
139133

140-
id_tracker m_rels_pending_tracker;
141-
142134
std::shared_ptr<id_tracker> m_stage2_ways_tracker;
143135
std::shared_ptr<id_tracker> m_stage2_rels_tracker;
144136

src/output-gazetteer.hpp

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,8 @@ class output_gazetteer_t : public output_t
4848

4949
bool need_forward_dependencies() const noexcept override { return false; }
5050

51-
void pending_way(osmid_t, int) override {}
52-
53-
void enqueue_relations(pending_queue_t &, osmid_t, size_t,
54-
size_t &) override
55-
{}
56-
void pending_relation(osmid_t, int) override {}
51+
void pending_way(osmid_t, bool) override {}
52+
void pending_relation(osmid_t, bool) override {}
5753

5854
void node_add(osmium::Node const &node) override;
5955
void way_add(osmium::Way *way) override;

src/output-multi.cpp

Lines changed: 2 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,7 @@ void output_multi_t::start()
6565
m_options.tblsmain_data);
6666
}
6767

68-
bool output_multi_t::has_pending() const
69-
{
70-
return !rels_pending_tracker.empty();
71-
}
72-
73-
void output_multi_t::pending_way(osmid_t id, int exists)
68+
void output_multi_t::pending_way(osmid_t id, bool exists)
7469
{
7570
// Try to fetch the way from the DB
7671
buffer.clear();
@@ -80,47 +75,7 @@ void output_multi_t::pending_way(osmid_t id, int exists)
8075
}
8176
}
8277

83-
void output_multi_t::enqueue_relations(pending_queue_t &job_queue, osmid_t id,
84-
size_t output_id, size_t &added)
85-
{
86-
osmid_t const prev = rels_pending_tracker.last_returned();
87-
if (id_tracker::is_valid(prev) && prev >= id) {
88-
if (prev > id) {
89-
job_queue.push(pending_job_t(id, output_id));
90-
}
91-
// already done the job
92-
return;
93-
}
94-
95-
//make sure we get the one passed in
96-
if (id_tracker::is_valid(id)) {
97-
job_queue.push(pending_job_t(id, output_id));
98-
++added;
99-
}
100-
101-
//grab the first one or bail if its not valid
102-
osmid_t popped = rels_pending_tracker.pop_mark();
103-
if (!id_tracker::is_valid(popped)) {
104-
return;
105-
}
106-
107-
//get all the ones up to the id that was passed in
108-
while (popped < id) {
109-
job_queue.push(pending_job_t(popped, output_id));
110-
++added;
111-
popped = rels_pending_tracker.pop_mark();
112-
}
113-
114-
//make sure to get this one as well and move to the next
115-
if (popped > id) {
116-
if (id_tracker::is_valid(popped)) {
117-
job_queue.push(pending_job_t(popped, output_id));
118-
++added;
119-
}
120-
}
121-
}
122-
123-
void output_multi_t::pending_relation(osmid_t id, int exists)
78+
void output_multi_t::pending_relation(osmid_t id, bool exists)
12479
{
12580
// Try to fetch the relation from the DB
12681
buffer.clear();
@@ -250,10 +205,6 @@ void output_multi_t::reprocess_way(osmium::Way *way, bool exists)
250205
if (m_processor->interests(geometry_processor::interest_relation) &&
251206
exists) {
252207
way_delete(way->id());
253-
auto const rel_ids = m_mid->relations_using_way(way->id());
254-
for (auto const rel_id : rel_ids) {
255-
rels_pending_tracker.mark(rel_id);
256-
}
257208
}
258209

259210
//check if we are keeping this way
@@ -370,19 +321,6 @@ void output_multi_t::delete_from_output(osmid_t id)
370321
}
371322
}
372323

373-
void output_multi_t::merge_pending_relations(output_t *other)
374-
{
375-
auto *const omulti = dynamic_cast<output_multi_t *>(other);
376-
377-
if (omulti) {
378-
osmid_t id;
379-
while (id_tracker::is_valid(
380-
(id = omulti->rels_pending_tracker.pop_mark()))) {
381-
rels_pending_tracker.mark(id);
382-
}
383-
}
384-
}
385-
386324
void output_multi_t::merge_expire_trees(output_t *other)
387325
{
388326
auto *const omulti = dynamic_cast<output_multi_t *>(other);

0 commit comments

Comments
 (0)