Skip to content

Commit 17a439f

Browse files
committed
replace remaining uses of implicit thread pool
1 parent 3cebe1c commit 17a439f

File tree

9 files changed

+122
-80
lines changed

9 files changed

+122
-80
lines changed

lib/simple_writer.cc

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
*
33
* This file is part of pyosmium. (https://osmcode.org/pyosmium/)
44
*
5-
* Copyright (C) 2024 Sarah Hoffmann <[email protected]> and others.
5+
* Copyright (C) 2025 Sarah Hoffmann <[email protected]> and others.
66
* For a full list of authors see the git log.
77
*/
88
#include <pybind11/pybind11.h>
@@ -14,10 +14,12 @@
1414
#include <osmium/io/header.hpp>
1515
#include <osmium/memory/buffer.hpp>
1616
#include <osmium/builder/osm_object_builder.hpp>
17+
#include <osmium/thread/pool.hpp>
1718

1819
#include "cast.h"
1920
#include "osm_base_objects.h"
2021
#include "base_handler.h"
22+
#include "io.h"
2123

2224
#include <filesystem>
2325

@@ -30,20 +32,10 @@ class SimpleWriter : public pyosmium::BaseHandler
3032
enum { BUFFER_WRAP = 4096 };
3133

3234
public:
33-
SimpleWriter(const char* filename, size_t bufsz, osmium::io::Header const *header,
34-
bool overwrite, const std::string &filetype)
35-
: writer(osmium::io::File(filename, filetype),
36-
header ? *header : osmium::io::Header(),
37-
overwrite ? osmium::io::overwrite::allow : osmium::io::overwrite::no),
38-
buffer(bufsz < 2 * BUFFER_WRAP ? 2 * BUFFER_WRAP : bufsz,
39-
osmium::memory::Buffer::auto_grow::yes),
40-
buffer_size(buffer.capacity()) // same rounding to BUFFER_WRAP
41-
{}
42-
43-
SimpleWriter(osmium::io::File file, size_t bufsz, osmium::io::Header const *header,
44-
bool overwrite)
45-
: writer(file, header ? *header : osmium::io::Header(),
46-
overwrite ? osmium::io::overwrite::allow : osmium::io::overwrite::no),
35+
SimpleWriter(osmium::io::File file, unsigned long bufsz,
36+
osmium::io::Header const *header, bool overwrite,
37+
osmium::thread::Pool *pool)
38+
: writer(file, header, overwrite, pool),
4739
buffer(bufsz < 2 * BUFFER_WRAP ? 2 * BUFFER_WRAP : bufsz,
4840
osmium::memory::Buffer::auto_grow::yes),
4941
buffer_size(buffer.capacity()) // same rounding to BUFFER_WRAP
@@ -154,8 +146,8 @@ class SimpleWriter : public pyosmium::BaseHandler
154146
void close()
155147
{
156148
if (buffer) {
157-
writer(std::move(buffer));
158-
writer.close();
149+
(*writer.get())(std::move(buffer));
150+
writer.get()->close();
159151
buffer = osmium::memory::Buffer();
160152
}
161153
}
@@ -335,11 +327,11 @@ class SimpleWriter : public pyosmium::BaseHandler
335327
osmium::memory::Buffer new_buffer(buffer_size, osmium::memory::Buffer::auto_grow::yes);
336328
using std::swap;
337329
swap(buffer, new_buffer);
338-
writer(std::move(new_buffer));
330+
(*writer.get())(std::move(new_buffer));
339331
}
340332
}
341333

342-
osmium::io::Writer writer;
334+
pyosmium::PyWriter writer;
343335
osmium::memory::Buffer buffer;
344336
size_t buffer_size;
345337
};
@@ -351,22 +343,38 @@ namespace pyosmium {
351343
void init_simple_writer(pybind11::module &m)
352344
{
353345
py::class_<SimpleWriter, BaseHandler>(m, "SimpleWriter")
354-
.def(py::init<const char*, unsigned long, osmium::io::Header const *, bool, const std::string&>(),
346+
.def(py::init<>([] (std::string file, unsigned long bufsz,
347+
osmium::io::Header const *header, bool overwrite,
348+
osmium::thread::Pool *pool) {
349+
return new SimpleWriter(osmium::io::File(std::move(file)), bufsz,
350+
header, overwrite, pool);
351+
}),
352+
py::keep_alive<1, 6>(),
355353
py::arg("filename"), py::arg("bufsz") = 4096*1024,
356354
py::arg("header") = nullptr,
357355
py::arg("overwrite") = false,
358-
py::arg("filetype") = "")
356+
py::arg("thread_pool") = nullptr
357+
)
359358
.def(py::init<>([] (std::filesystem::path const &file, unsigned long bufsz,
360-
osmium::io::Header const *header, bool overwrite) {
361-
return new SimpleWriter(file.string().c_str(), bufsz, header, overwrite, "");
359+
osmium::io::Header const *header, bool overwrite,
360+
osmium::thread::Pool *pool) {
361+
return new SimpleWriter(osmium::io::File(file.string()), bufsz,
362+
header, overwrite, pool);
362363
}),
364+
py::keep_alive<1, 6>(),
363365
py::arg("filename"), py::arg("bufsz") = 4096*1024,
364366
py::arg("header") = nullptr,
365-
py::arg("overwrite") = false)
366-
.def(py::init<osmium::io::File, unsigned long, osmium::io::Header const *, bool>(),
367+
py::arg("overwrite") = false,
368+
py::arg("thread_pool") = nullptr
369+
)
370+
.def(py::init<osmium::io::File, unsigned long, osmium::io::Header const *,
371+
bool, osmium::thread::Pool *>(),
372+
py::keep_alive<1, 6>(),
367373
py::arg("filename"), py::arg("bufsz") = 4096*1024,
368374
py::arg("header") = nullptr,
369-
py::arg("overwrite") = false)
375+
py::arg("overwrite") = false,
376+
py::arg("thread_pool") = nullptr
377+
)
370378
.def("add_node", &SimpleWriter::add_node, py::arg("node"))
371379
.def("add_way", &SimpleWriter::add_way, py::arg("way"))
372380
.def("add_relation", &SimpleWriter::add_relation, py::arg("relation"))

src/osmium/_osmium.pyi

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22
#
33
# This file is part of pyosmium. (https://osmcode.org/pyosmium/)
44
#
5-
# Copyright (C) 2024 Sarah Hoffmann <[email protected]> and others.
5+
# Copyright (C) 2025 Sarah Hoffmann <[email protected]> and others.
66
# For a full list of authors see the git log.
77
from typing import ByteString, Union, Optional, Any
88
import os
99

1010
from .osm import osm_entity_bits
1111
from .osm.types import OSMEntity
1212
from .index import LocationTable, IdSet
13-
from .io import Reader, Writer, Header, File, FileBuffer
13+
from .io import Reader, Writer, Header, File, FileBuffer, ThreadPool
1414

1515
# Placeholder for more narrow type definition to come
1616
HandlerLike = object
@@ -127,18 +127,16 @@ class SimpleWriter(BaseHandler):
127127
when writing is finished.
128128
"""
129129
def __init__(self, file: Union[str, 'os.PathLike[str]', File],
130-
bufsz: int= ...,
131-
header: Optional[Header]= ..., overwrite: bool= ...,
132-
filetype: str= ...) -> None:
130+
bufsz: int=4096*1024,
131+
header: Optional[Header]=None, overwrite: bool=False,
132+
thread_pool: Optional[ThreadPool]=None) -> None:
133133
""" Initiate a new writer for the given file. The writer will
134134
refuse to overwrite an already existing file unless _overwrite_
135135
is explicitly set to `True`.
136136
137137
The file type is usually determined from the file extension.
138138
If you want to explicitly set the filetype (for example, when
139139
writing to standard output '-'), then use a File object.
140-
Using the _filetype_ parameter to set the file type is deprecated
141-
and only works when the file is a string.
142140
143141
The _header_ parameter can be used to set a custom header in
144142
the output file. What kind of information can be written into
@@ -149,6 +147,14 @@ class SimpleWriter(BaseHandler):
149147
size is 4MB. Larger buffers are normally better but you should
150148
be aware that there are normally multiple buffers in use during
151149
the write process.
150+
151+
The writer implicitly creates a private
152+
[ThreadPool][osmium.io.ThreadPool] which it may
153+
use to parallelize writing to the output. Alternatively you
154+
may hand in an externally created thread pool. This may be useful
155+
when you create many writers in parallel and want them to share
156+
a single thread pool or when you want to customize the size
157+
of the thread pool.
152158
"""
153159
def add_node(self, node: object) -> None:
154160
""" Add a new node to the file. The node may be a

src/osmium/back_reference_writer.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,15 @@
22
#
33
# This file is part of pyosmium. (https://osmcode.org/pyosmium/)
44
#
5-
# Copyright (C) 2024 Sarah Hoffmann <[email protected]> and others.
5+
# Copyright (C) 2025 Sarah Hoffmann <[email protected]> and others.
66
# For a full list of authors see the git log.
7-
from typing import Any, Union
7+
from typing import Any, Union, Optional
88
from pathlib import Path
99
from tempfile import TemporaryDirectory
1010
import os
1111

1212
from osmium._osmium import SimpleWriter
13-
from osmium.io import File, FileBuffer
13+
from osmium.io import File, FileBuffer, ThreadPool
1414
from osmium.file_processor import FileProcessor, zip_processors
1515
from osmium import IdTracker
1616

@@ -30,7 +30,7 @@ class BackReferenceWriter:
3030
def __init__(self, outfile: Union[str, 'os.PathLike[str]', File],
3131
ref_src: Union[str, 'os.PathLike[str]', File, FileBuffer],
3232
overwrite: bool = False, remove_tags: bool = True,
33-
relation_depth: int = 0):
33+
relation_depth: int = 0, thread_pool: Optional[ThreadPool] = None):
3434
""" Create a new writer.
3535
3636
`outfile` is the name of the output file to write. The file must
@@ -47,10 +47,17 @@ def __init__(self, outfile: Union[str, 'os.PathLike[str]', File],
4747
The writer will not complete nested relations by default. If you
4848
need nested relations, set `relation_depth` to the minimum depth
4949
to which relations shall be completed.
50+
51+
The writer implicitly creates a private
52+
[ThreadPool][osmium.io.ThreadPool] which it
53+
uses to parallelize IO operations. Alternatively you
54+
may hand in an externally created thread pool.
5055
"""
5156
self.outfile = outfile
5257
self.tmpdir = TemporaryDirectory()
53-
self.writer = SimpleWriter(str(Path(self.tmpdir.name, 'back_writer.osm.pbf')))
58+
self.thread_pool = thread_pool or ThreadPool()
59+
self.writer = SimpleWriter(Path(self.tmpdir.name, 'back_writer.osm.pbf'),
60+
thread_pool=self.thread_pool)
5461
self.overwrite = overwrite
5562
self.remove_tags = remove_tags
5663
self.id_tracker = IdTracker()
@@ -102,10 +109,13 @@ def close(self) -> None:
102109
self.id_tracker.complete_backward_references(self.ref_src,
103110
relation_depth=self.relation_depth)
104111

105-
fp1 = FileProcessor(str(Path(self.tmpdir.name, 'back_writer.osm.pbf')))
106-
fp2 = FileProcessor(self.ref_src).with_filter(self.id_tracker.id_filter())
112+
fp1 = FileProcessor(Path(self.tmpdir.name, 'back_writer.osm.pbf'),
113+
thread_pool=self.thread_pool)
114+
fp2 = FileProcessor(self.ref_src, thread_pool=self.thread_pool
115+
).with_filter(self.id_tracker.id_filter())
107116

108-
with SimpleWriter(self.outfile, overwrite=self.overwrite) as writer:
117+
with SimpleWriter(self.outfile, overwrite=self.overwrite,
118+
thread_pool=self.thread_pool) as writer:
109119
for o1, o2 in zip_processors(fp1, fp2):
110120
if o1:
111121
writer.add(o1)

src/osmium/file_processor.py

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import osmium
1111
from osmium.index import LocationTable
12-
from osmium.io import File, FileBuffer
12+
from osmium.io import File, FileBuffer, ThreadPool, Reader
1313
from osmium.osm.types import OSMEntity
1414

1515

@@ -20,7 +20,8 @@ class FileProcessor:
2020
"""
2121

2222
def __init__(self, indata: Union[File, FileBuffer, str, 'os.PathLike[str]'],
23-
entities: osmium.osm.osm_entity_bits = osmium.osm.ALL) -> None:
23+
entities: osmium.osm.osm_entity_bits = osmium.osm.ALL,
24+
thread_pool: Optional[ThreadPool] = None) -> None:
2425
""" Initialise a new file processor for the given input source _indata_.
2526
This may either be a filename, an instance of [File](IO.md#osmium.io.File)
2627
or buffered data in form of a [FileBuffer](IO.md#osmium.io.FileBuffer).
@@ -30,6 +31,13 @@ def __init__(self, indata: Union[File, FileBuffer, str, 'os.PathLike[str]'],
3031
directly at the source file and will never be passed to any filters
3132
including the location and area processors. You usually should not
3233
be restricting objects, when using those.
34+
35+
By default, pyosmium will create a private thread pool, which is
36+
used to parallelize reading of the file. Alternatively you may
37+
explicitly create a thread pool and hand it to the FileProcessor.
38+
This may be necessary if you plan to run many processors in
39+
parallel and want them to share a common thread pool or if you
40+
want to change the default settings of the thread pool.
3341
"""
3442
self._file = indata
3543
self._entities = entities
@@ -38,13 +46,14 @@ def __init__(self, indata: Union[File, FileBuffer, str, 'os.PathLike[str]'],
3846
self._filters: List['osmium._osmium.HandlerLike'] = []
3947
self._area_filters: List['osmium._osmium.HandlerLike'] = []
4048
self._filtered_handler: Optional['osmium._osmium.HandlerLike'] = None
49+
self._thread_pool = thread_pool or ThreadPool()
4150

4251
@property
4352
def header(self) -> osmium.io.Header:
4453
""" (read-only) [Header](IO.md#osmium.io.Header) information
4554
for the file to be read.
4655
"""
47-
return osmium.io.Reader(self._file, osmium.osm.NOTHING).header()
56+
return Reader(self._file, osmium.osm.NOTHING, self._thread_pool).header()
4857

4958
@property
5059
def node_location_storage(self) -> Optional[LocationTable]:
@@ -153,31 +162,28 @@ def __iter__(self) -> Iterator[OSMEntity]:
153162
handlers.append(lh)
154163

155164
if self._area_handler is None:
156-
reader = osmium.io.Reader(self._file, self._entities)
157-
it = osmium.OsmFileIterator(reader, *handlers, *self._filters)
158-
if self._filtered_handler:
159-
it.set_filtered_handler(self._filtered_handler)
160-
yield from it
165+
with Reader(self._file, self._entities, thread_pool=self._thread_pool) as reader:
166+
it = osmium.OsmFileIterator(reader, *handlers, *self._filters)
167+
if self._filtered_handler:
168+
it.set_filtered_handler(self._filtered_handler)
169+
yield from it
161170
return
162171

163172
# need areas, do two pass handling
164-
rd = osmium.io.Reader(self._file, osmium.osm.RELATION)
165-
try:
173+
with Reader(self._file, osmium.osm.RELATION, thread_pool=self._thread_pool) as rd:
166174
osmium.apply(rd, *self._area_filters, self._area_handler.first_pass_handler())
167-
finally:
168-
rd.close()
169175

170176
buffer_it = osmium.BufferIterator(*self._filters)
171177
handlers.append(self._area_handler.second_pass_to_buffer(buffer_it))
172178

173-
reader = osmium.io.Reader(self._file, self._entities)
174-
it = osmium.OsmFileIterator(reader, *handlers, *self._filters)
175-
if self._filtered_handler:
176-
it.set_filtered_handler(self._filtered_handler)
177-
for obj in it:
178-
yield obj
179-
if buffer_it:
180-
yield from buffer_it
179+
with Reader(self._file, self._entities, thread_pool=self._thread_pool) as reader:
180+
it = osmium.OsmFileIterator(reader, *handlers, *self._filters)
181+
if self._filtered_handler:
182+
it.set_filtered_handler(self._filtered_handler)
183+
for obj in it:
184+
yield obj
185+
if buffer_it:
186+
yield from buffer_it
181187

182188
# catch anything after the final flush
183189
if buffer_it:

src/osmium/forward_reference_writer.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
#
33
# This file is part of pyosmium. (https://osmcode.org/pyosmium/)
44
#
5-
# Copyright (C) 2024 Sarah Hoffmann <[email protected]> and others.
5+
# Copyright (C) 2025 Sarah Hoffmann <[email protected]> and others.
66
# For a full list of authors see the git log.
77
from typing import Any, Optional, Union
88
from pathlib import Path
@@ -11,7 +11,7 @@
1111

1212
from osmium._osmium import SimpleWriter
1313
from osmium import IdTracker
14-
from osmium.io import File, FileBuffer
14+
from osmium.io import File, FileBuffer, ThreadPool
1515
from osmium.file_processor import FileProcessor, zip_processors
1616

1717

@@ -32,7 +32,8 @@ def __init__(self, outfile: Union[str, 'os.PathLike[str]', File],
3232
ref_src: Union[str, 'os.PathLike[str]', File, FileBuffer],
3333
overwrite: bool = False, back_references: bool = True,
3434
remove_tags: bool = True, forward_relation_depth: int = 0,
35-
backward_relation_depth: int = 1) -> None:
35+
backward_relation_depth: int = 1,
36+
thread_pool: Optional[ThreadPool] = None) -> None:
3637
""" Create a new writer.
3738
3839
`outfile` is the name of the output file to write. The file must
@@ -52,7 +53,9 @@ def __init__(self, outfile: Union[str, 'os.PathLike[str]', File],
5253
"""
5354
self.outfile = outfile
5455
self.tmpdir: Optional['TemporaryDirectory[Any]'] = TemporaryDirectory()
55-
self.writer = SimpleWriter(str(Path(self.tmpdir.name, 'forward_writer.osm.pbf')))
56+
self.thread_pool = thread_pool or ThreadPool()
57+
self.writer = SimpleWriter(Path(self.tmpdir.name, 'forward_writer.osm.pbf'),
58+
thread_pool=self.thread_pool)
5659
self.overwrite = overwrite
5760
self.back_references = back_references
5861
self.id_tracker = IdTracker()
@@ -118,10 +121,13 @@ def close(self) -> None:
118121
self.ref_src,
119122
relation_depth=self.backward_relation_depth)
120123

121-
fp1 = FileProcessor(Path(self.tmpdir.name, 'forward_writer.osm.pbf'))
122-
fp2 = FileProcessor(self.ref_src).with_filter(self.id_tracker.id_filter())
124+
fp1 = FileProcessor(Path(self.tmpdir.name, 'forward_writer.osm.pbf'),
125+
thread_pool=self.thread_pool)
126+
fp2 = FileProcessor(self.ref_src, thread_pool=self.thread_pool
127+
).with_filter(self.id_tracker.id_filter())
123128

124-
with SimpleWriter(self.outfile, overwrite=self.overwrite) as writer:
129+
with SimpleWriter(self.outfile, overwrite=self.overwrite,
130+
thread_pool=self.thread_pool) as writer:
125131
for o1, o2 in zip_processors(fp1, fp2):
126132
if o1:
127133
writer.add(o1)

src/osmium/helper.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ class WriteHandler(SimpleWriter):
5656
documentation.
5757
"""
5858

59-
def __init__(self, filename: str, bufsz: int = 4096*1024, filetype: str = "") -> None:
60-
super().__init__(filename, bufsz=bufsz, filetype=filetype)
59+
def __init__(self, filename: str, bufsz: int = 4096*1024) -> None:
60+
super().__init__(filename, bufsz=bufsz)
6161

6262

6363
def _merge_apply(self: MergeInputReader, *handlers: 'HandlerLike',

0 commit comments

Comments
 (0)