Skip to content

Commit 6ef9865

Browse files
committed
add end_id paramter to diff processing functions
1 parent caa4989 commit 6ef9865

File tree

2 files changed

+88
-30
lines changed

2 files changed

+88
-30
lines changed

src/osmium/replication/server.py

Lines changed: 60 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
# For a full list of authors see the git log.
77
""" Helper functions to communicate with replication servers.
88
"""
9-
from typing import NamedTuple, Optional, Any, Iterator, cast, Mapping, Tuple
9+
from typing import NamedTuple, Optional, Any, Iterator, cast, Mapping, Tuple, Dict
1010
import urllib.request as urlrequest
1111
from urllib.error import URLError
1212
import datetime as dt
@@ -67,7 +67,7 @@ def __init__(self, url: str, diff_type: str = 'osc.gz') -> None:
6767

6868
self.baseurl = url
6969
self.diff_type = diff_type
70-
self.extra_request_params: dict[str, Any] = dict(timeout=60, stream=True)
70+
self.extra_request_params: Dict[str, Any] = dict(timeout=60, stream=True)
7171
self.session: Optional[requests.Session] = None
7272
self.retry = Retry(total=3, backoff_factor=0.5, allowed_methods={'GET'},
7373
status_forcelist=[408, 429, 500, 502, 503, 504])
@@ -125,12 +125,19 @@ def _get_url_with_session() -> Iterator[requests.Response]:
125125

126126
return _get_url_with_session()
127127

128-
def collect_diffs(self, start_id: int, max_size: int = 1024) -> Optional[DownloadResult]:
128+
def collect_diffs(self, start_id: int, max_size: Optional[int] = None,
129+
end_id: Optional[int] = None) -> Optional[DownloadResult]:
129130
""" Create a MergeInputReader and download diffs starting with sequence
130-
id `start_id` into it. `max_size`
131-
restricts the number of diffs that are downloaded. The download
132-
stops as soon as either a diff cannot be downloaded or the
133-
unpacked data in memory exceeds `max_size` kB.
131+
id `start_id` into it. `end_id` optionally gives the highest
132+
sequence number to download. `max_size` restricts the number of
133+
diffs that are downloaded by size. If neither `end_id` nor
134+
`max_size` are given, then download default to stop after 1MB.
135+
136+
The download stops as soon as
137+
1. a diff cannot be downloaded or
138+
2. the end_id (inclusive) is reached or
139+
3. the unpacked data in memory exceeds `max_size` kB or,
140+
when no `end_id` and `max_size` are given, 1024kB.
134141
135142
If some data was downloaded, returns a namedtuple with three fields:
136143
`id` contains the sequence id of the last downloaded diff, `reader`
@@ -140,19 +147,25 @@ def collect_diffs(self, start_id: int, max_size: int = 1024) -> Optional[Downloa
140147
Returns None if there was an error during download or no new
141148
data was available.
142149
"""
143-
left_size = max_size * 1024
144-
current_id = start_id
145-
146150
# must not read data newer than the published sequence id
147151
# or we might end up reading partial data
148152
newest = self.get_state_info()
149153

150-
if newest is None or current_id > newest.sequence:
154+
if newest is None or start_id > newest.sequence:
151155
return None
152156

157+
current_id = start_id
158+
left_size: Optional[int] = None
159+
if max_size is not None:
160+
left_size = max_size * 1024
161+
elif end_id is None:
162+
left_size = 1024 * 1024
163+
153164
rd = MergeInputReader()
154165

155-
while left_size > 0 and current_id <= newest.sequence:
166+
while (left_size is None or left_size > 0) \
167+
and (end_id is None or current_id <= end_id) \
168+
and current_id <= newest.sequence:
156169
try:
157170
diffdata = self.get_diff_block(current_id)
158171
except: # noqa: E722
@@ -163,21 +176,32 @@ def collect_diffs(self, start_id: int, max_size: int = 1024) -> Optional[Downloa
163176
return None
164177
break
165178

166-
left_size -= rd.add_buffer(diffdata, self.diff_type)
167-
LOG.debug("Downloaded change %d. (%d kB available in download buffer)",
168-
current_id, left_size / 1024)
179+
diff_size = rd.add_buffer(diffdata, self.diff_type)
180+
if left_size is None:
181+
LOG.debug("Downloaded change %d.", current_id)
182+
else:
183+
left_size -= diff_size
184+
LOG.debug("Downloaded change %d. (%d kB available in download buffer)",
185+
current_id, left_size / 1024)
169186
current_id += 1
170187

171188
return DownloadResult(current_id - 1, rd, newest.sequence)
172189

173190
def apply_diffs(self, handler: BaseHandler, start_id: int,
174-
max_size: int = 1024, idx: str = "",
175-
simplify: bool = True) -> Optional[int]:
191+
max_size: Optional[int] = None,
192+
idx: str = "", simplify: bool = True,
193+
end_id: Optional[int] = None) -> Optional[int]:
176194
""" Download diffs starting with sequence id `start_id`, merge them
177-
together and then apply them to handler `handler`. `max_size`
178-
restricts the number of diffs that are downloaded. The download
179-
stops as soon as either a diff cannot be downloaded or the
180-
unpacked data in memory exceeds `max_size` kB.
195+
together and then apply them to handler `handler`. `end_id`
196+
optionally gives the highest sequence id to download. `max_size`
197+
allows to restrict the amount of diffs that are downloaded.
198+
Downloaded diffs are temporarily saved in memory and this parameter
199+
ensures that pyosmium doesn't run out of memory. `max_size`
200+
is the maximum size in kB this internal buffer may have.
201+
202+
If neither `end_id` nor `max_size` are given, the download is
203+
restricted to a maximum size of 1MB. The download also
204+
stops when the most recent diff has been processed.
181205
182206
If `idx` is set, a location cache will be created and applied to
183207
the way nodes. You should be aware that diff files usually do not
@@ -197,7 +221,7 @@ def apply_diffs(self, handler: BaseHandler, start_id: int,
197221
The function returns the sequence id of the last diff that was
198222
downloaded or None if the download failed completely.
199223
"""
200-
diffs = self.collect_diffs(start_id, max_size)
224+
diffs = self.collect_diffs(start_id, end_id=end_id, max_size=max_size)
201225

202226
if diffs is None:
203227
return None
@@ -206,19 +230,26 @@ def apply_diffs(self, handler: BaseHandler, start_id: int,
206230

207231
return diffs.id
208232

209-
def apply_diffs_to_file(self, infile: str, outfile: str,
210-
start_id: int, max_size: int = 1024,
233+
def apply_diffs_to_file(self, infile: str, outfile: str, start_id: int,
234+
max_size: Optional[int] = None,
211235
set_replication_header: bool = True,
212236
extra_headers: Optional[Mapping[str, str]] = None,
213-
outformat: Optional[str] = None) -> Optional[Tuple[int, int]]:
237+
outformat: Optional[str] = None,
238+
end_id: Optional[int] = None) -> Optional[Tuple[int, int]]:
214239
""" Download diffs starting with sequence id `start_id`, merge them
215240
with the data from the OSM file named `infile` and write the result
216241
into a file with the name `outfile`. The output file must not yet
217242
exist.
218243
219-
`max_size` restricts the number of diffs that are downloaded. The
220-
download stops as soon as either a diff cannot be downloaded or the
221-
unpacked data in memory exceeds `max_size` kB.
244+
`end_id` optionally gives the highest sequence id to download.
245+
`max_size` allows to restrict the amount of diffs that are
246+
downloaded. Downloaded diffs are saved in memory and this parameter
247+
ensures that pyosmium doesn't run out of memory. `max_size`
248+
is the maximum size in kB this internal buffer may have.
249+
250+
If neither `end_id` nor `max_size` are given, the
251+
download is restricted to a maximum size of 1MB. The download also
252+
stops when the most recent diff has been processed.
222253
223254
If `set_replication_header` is true then the URL of the replication
224255
server and the sequence id and timestamp of the last diff applied
@@ -235,7 +266,7 @@ def apply_diffs_to_file(self, infile: str, outfile: str,
235266
newest available sequence id if new data has been written or None
236267
if no data was available or the download failed completely.
237268
"""
238-
diffs = self.collect_diffs(start_id, max_size)
269+
diffs = self.collect_diffs(start_id, end_id=end_id, max_size=max_size)
239270

240271
if diffs is None:
241272
return None

test/test_replication.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
from werkzeug.wrappers import Response
1515

16-
from helpers import mkdate, CountingHandler
16+
from helpers import mkdate, CountingHandler, IDCollector
1717

1818
import osmium.replication.server as rserv
1919
import osmium.replication
@@ -223,6 +223,33 @@ def test_apply_diffs_count(httpserver):
223223
assert h.counts == [1, 1, 1, 0]
224224

225225

226+
@pytest.mark.parametrize('end_id,max_size, actual_end', [(107, None, 107),
227+
(None, 512, 108),
228+
(105, 512, 105),
229+
(110, 512, 108),
230+
(None, None, 115)])
231+
def test_apply_diffs_endid(httpserver, end_id, max_size, actual_end):
232+
httpserver.expect_request('/state.txt').respond_with_data("""\
233+
sequenceNumber=140
234+
timestamp=2017-08-26T11\\:04\\:02Z
235+
""")
236+
for i in range(100, 141):
237+
httpserver.expect_request(f'/000/000/{i}.opl')\
238+
.respond_with_data(f"r{i} M" + ",".join(f"n{i}@" for i in range(1, 3000)))
239+
240+
with rserv.ReplicationServer(httpserver.url_for(''), "opl") as svr:
241+
res = svr.collect_diffs(101, end_id=end_id, max_size=max_size)
242+
243+
assert res is not None
244+
assert res.id == actual_end
245+
assert res.newest == 140
246+
247+
ids = IDCollector()
248+
res.reader.apply(ids)
249+
250+
assert ids.relations == list(range(101, actual_end + 1))
251+
252+
226253
def test_apply_diffs_without_simplify(httpserver):
227254
httpserver.expect_ordered_request('/state.txt').respond_with_data("""\
228255
sequenceNumber=100

0 commit comments

Comments
 (0)