Skip to content

Commit 2c4c422

Browse files
committed
Move process_file(s) function out of osmdata.
1 parent 3997cf1 commit 2c4c422

File tree

9 files changed

+300
-269
lines changed

9 files changed

+300
-269
lines changed

src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ set(osm2pgsql_lib_SOURCES
66
expire-tiles.cpp
77
gazetteer-style.cpp
88
geometry-processor.cpp
9+
input.cpp
910
logging.cpp
1011
middle-pgsql.cpp
1112
middle-ram.cpp

src/input.cpp

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
2+
#include <memory>
3+
#include <queue>
4+
#include <stdexcept>
5+
#include <vector>
6+
7+
#include <osmium/io/any_input.hpp>
8+
#include <osmium/visitor.hpp>
9+
10+
#include "format.hpp"
11+
#include "input.hpp"
12+
#include "osmdata.hpp"
13+
#include "progress-display.hpp"
14+
15+
type_id_version check_input(type_id_version const &last, type_id_version curr)
16+
{
17+
if (curr.id < 0) {
18+
throw std::runtime_error{
19+
"Negative OSM object ids are not allowed: {} id {}."_format(
20+
osmium::item_type_to_name(curr.type), curr.id)};
21+
}
22+
23+
if (last.type == curr.type) {
24+
if (last.id < curr.id) {
25+
return curr;
26+
}
27+
28+
if (last.id > curr.id) {
29+
throw std::runtime_error{
30+
"Input data is not ordered: {} id {} after {}."_format(
31+
osmium::item_type_to_name(last.type), curr.id, last.id)};
32+
}
33+
34+
if (last.version < curr.version) {
35+
return curr;
36+
}
37+
38+
throw std::runtime_error{
39+
"Input data is not ordered: {} id {} version {} after {}."_format(
40+
osmium::item_type_to_name(last.type), curr.id, curr.version,
41+
last.version)};
42+
}
43+
44+
if (item_type_to_nwr_index(last.type) <=
45+
item_type_to_nwr_index(curr.type)) {
46+
return curr;
47+
}
48+
49+
throw std::runtime_error{"Input data is not ordered: {} after {}."_format(
50+
osmium::item_type_to_name(curr.type),
51+
osmium::item_type_to_name(last.type))};
52+
}
53+
54+
type_id_version check_input(type_id_version const &last,
55+
osmium::OSMObject const &object)
56+
{
57+
return check_input(last, {object.type(), object.id(), object.version()});
58+
}
59+
60+
/**
61+
* A data source is where we get the OSM objects from, one at a time. It
62+
* wraps the osmium::io::Reader.
63+
*/
64+
class data_source_t
65+
{
66+
public:
67+
explicit data_source_t(osmium::io::File const &file)
68+
: m_reader(new osmium::io::Reader{file})
69+
{
70+
get_next_nonempty_buffer();
71+
m_last = check_input(m_last, *m_it);
72+
}
73+
74+
bool empty() const noexcept { return !m_buffer; }
75+
76+
bool next()
77+
{
78+
assert(!empty());
79+
++m_it;
80+
81+
while (m_it == m_end) {
82+
if (!get_next_nonempty_buffer()) {
83+
return false;
84+
}
85+
}
86+
87+
m_last = check_input(m_last, *m_it);
88+
return true;
89+
}
90+
91+
osmium::OSMObject *get() noexcept
92+
{
93+
assert(!empty());
94+
return &*m_it;
95+
}
96+
97+
std::size_t offset() const noexcept { return m_reader->offset(); }
98+
99+
void close()
100+
{
101+
m_reader->close();
102+
m_reader.reset();
103+
}
104+
105+
private:
106+
bool get_next_nonempty_buffer()
107+
{
108+
while ((m_buffer = m_reader->read())) {
109+
m_it = m_buffer.begin<osmium::OSMObject>();
110+
m_end = m_buffer.end<osmium::OSMObject>();
111+
if (m_it != m_end) {
112+
return true;
113+
}
114+
}
115+
return false;
116+
}
117+
118+
using iterator = osmium::memory::Buffer::t_iterator<osmium::OSMObject>;
119+
120+
std::unique_ptr<osmium::io::Reader> m_reader;
121+
osmium::memory::Buffer m_buffer{};
122+
iterator m_it{};
123+
iterator m_end{};
124+
type_id_version m_last = {osmium::item_type::node, 0, 0};
125+
126+
}; // class data_source_t
127+
128+
/**
129+
* A element in a priority queue of OSM objects. Holds a pointer to the OSM
130+
* object as well as a pointer to the source the OSM object came from.
131+
*/
132+
class queue_element_t
133+
{
134+
public:
135+
queue_element_t(osmium::OSMObject *object, data_source_t *source) noexcept
136+
: m_object(object), m_source(source)
137+
{}
138+
139+
osmium::OSMObject const &object() const noexcept { return *m_object; }
140+
141+
osmium::OSMObject &object() noexcept { return *m_object; }
142+
143+
data_source_t *data_source() const noexcept { return m_source; }
144+
145+
friend bool operator<(queue_element_t const &lhs,
146+
queue_element_t const &rhs) noexcept
147+
{
148+
// This is needed for the priority queue. We want objects with smaller
149+
// id (and earlier versions of the same object) to come first, but
150+
// the priority queue expects largest first. So we need to reverse the
151+
// comparison here.
152+
return lhs.object() > rhs.object();
153+
}
154+
155+
friend bool operator==(queue_element_t const &lhs,
156+
queue_element_t const &rhs) noexcept
157+
{
158+
return lhs.object().type() == rhs.object().type() &&
159+
lhs.object().id() == rhs.object().id();
160+
}
161+
162+
friend bool operator!=(queue_element_t const &lhs,
163+
queue_element_t const &rhs) noexcept
164+
{
165+
return !(lhs == rhs);
166+
}
167+
168+
private:
169+
osmium::OSMObject *m_object;
170+
data_source_t *m_source;
171+
172+
}; // class queue_element_t
173+
174+
void process_file(osmium::io::File const &file, osmdata_t &osmdata,
175+
progress_display_t &progress, bool append)
176+
{
177+
osmium::io::Reader reader{file};
178+
type_id_version last{osmium::item_type::node, 0, 0};
179+
180+
while (osmium::memory::Buffer buffer = reader.read()) {
181+
for (auto &object : buffer.select<osmium::OSMObject>()) {
182+
last = check_input(last, object);
183+
if (!append && object.deleted()) {
184+
throw std::runtime_error{
185+
"Input file contains deleted objects but "
186+
"you are not in append mode."};
187+
}
188+
osmium::apply_item(object, osmdata, progress);
189+
}
190+
}
191+
192+
osmdata.flush();
193+
194+
reader.close();
195+
}
196+
197+
void process_files(std::vector<osmium::io::File> const &files,
198+
osmdata_t &osmdata, progress_display_t &progress,
199+
bool append)
200+
{
201+
if (files.size() == 1) {
202+
process_file(files.front(), osmdata, progress, append);
203+
return;
204+
}
205+
206+
std::vector<data_source_t> data_sources;
207+
data_sources.reserve(files.size());
208+
209+
std::priority_queue<queue_element_t> queue;
210+
211+
for (osmium::io::File const &file : files) {
212+
data_sources.emplace_back(file);
213+
214+
if (!data_sources.back().empty()) {
215+
queue.emplace(data_sources.back().get(), &data_sources.back());
216+
}
217+
}
218+
219+
while (!queue.empty()) {
220+
auto element = queue.top();
221+
queue.pop();
222+
if (queue.empty() || element != queue.top()) {
223+
if (!append && element.object().deleted()) {
224+
throw std::runtime_error{
225+
"Input file contains deleted objects but "
226+
"you are not in append mode."};
227+
}
228+
osmium::apply_item(element.object(), osmdata, progress);
229+
}
230+
231+
auto *source = element.data_source();
232+
if (source->next()) {
233+
queue.emplace(source->get(), source);
234+
}
235+
}
236+
237+
osmdata.flush();
238+
239+
for (auto &data_source : data_sources) {
240+
data_source.close();
241+
}
242+
}

src/input.hpp

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#ifndef OSM2PGSQL_INPUT_HPP
2+
#define OSM2PGSQL_INPUT_HPP
3+
4+
/**
5+
* \file
6+
*
7+
* This file is part of osm2pgsql (https://github.com/openstreetmap/osm2pgsql).
8+
*
9+
* It contains the functions reading and checking the input data.
10+
*/
11+
12+
#include <vector>
13+
14+
#include <osmium/fwd.hpp>
15+
#include <osmium/io/file.hpp>
16+
17+
#include "osmtypes.hpp"
18+
19+
class osmdata_t;
20+
class progress_display_t;
21+
22+
struct type_id_version
23+
{
24+
osmium::item_type type;
25+
osmid_t id;
26+
osmium::object_version_type version;
27+
};
28+
29+
/**
30+
* Compare two tuples (type, id, version) throw a descriptive error if either
31+
* the curr id is negative or if the data is not ordered.
32+
*/
33+
type_id_version check_input(type_id_version const &last, type_id_version curr);
34+
35+
type_id_version check_input(type_id_version const &last,
36+
osmium::OSMObject const &object);
37+
38+
/**
39+
* Process the specified OSM files (stage 1a).
40+
*/
41+
void process_files(std::vector<osmium::io::File> const &files,
42+
osmdata_t &osmdata, progress_display_t &progress,
43+
bool append);
44+
45+
#endif // OSM2PGSQL_INPUT_HPP

src/osm2pgsql.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
#include "db-check.hpp"
2727
#include "format.hpp"
28+
#include "input.hpp"
2829
#include "logging.hpp"
2930
#include "middle-pgsql.hpp"
3031
#include "middle-ram.hpp"
@@ -36,6 +37,8 @@
3637
#include "util.hpp"
3738
#include "version.hpp"
3839

40+
#include <osmium/io/file.hpp>
41+
3942
#include <ctime>
4043
#include <exception>
4144
#include <memory>
@@ -120,7 +123,7 @@ int main(int argc, char *argv[])
120123
util::timer_t timer_parse;
121124

122125
progress_display_t progress{get_logger().show_progress()};
123-
osmdata.process_files(files, progress);
126+
process_files(files, osmdata, progress, options.append);
124127

125128
progress.print_status(std::time(nullptr));
126129
fmt::print(stderr, " parse time: {}\n",

0 commit comments

Comments
 (0)