Skip to content

Commit 5468b37

Browse files
committed
add a script for simple updating of a database
The script needs python, pyosmium and psycopg2. It holds the replication state in a table in the database.
1 parent e15cd1a commit 5468b37

File tree

1 file changed

+350
-0
lines changed

1 file changed

+350
-0
lines changed

osm2pgsql-replication

Lines changed: 350 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,350 @@
1+
#!/usr/bin/env python3
2+
3+
# SPDX-License-Identifier: GPL-2.0-or-later
4+
#
5+
# This file is part of osm2pgsql (https://osm2pgsql.org/).
6+
#
7+
# Copyright (C) 2006-2021 by the osm2pgsql developer community.
8+
# For a full list of authors see the git log.
9+
10+
"""
11+
Update an osm2pgsql database with changes from a OSM replication server.
12+
13+
This tool initialises the updating process by looking at the import file
14+
or the newest object in the database. The state is then saved in a table
15+
in the database. Subsequent runs download newly available data and apply
16+
it to the database.
17+
18+
See the help of the 'init' and 'update' command for more information on
19+
how to use %(prog)s.
20+
"""
21+
22+
from argparse import ArgumentParser, RawDescriptionHelpFormatter
23+
import datetime as dt
24+
import json
25+
import logging
26+
import sys
27+
import subprocess
28+
import tempfile
29+
import traceback
30+
from pathlib import Path
31+
import urllib.request as urlrequest
32+
33+
import psycopg2
34+
35+
from osmium.replication.server import ReplicationServer
36+
from osmium.replication.utils import get_replication_header
37+
from osmium import WriteHandler
38+
39+
LOG = logging.getLogger()
40+
41+
def connect(args):
42+
""" Create a connection from the given command line arguments.
43+
"""
44+
return psycopg2.connect(dbname=args.database, user=args.username,
45+
host=args.host, port=args.port)
46+
47+
48+
def compute_database_date(conn, prefix):
49+
""" Determine the date of the database from the newest object in the
50+
database.
51+
"""
52+
# First, find the way with the highest ID in the database
53+
# Using nodes would be more reliable but those are not cached by osm2pgsql.
54+
with conn.cursor() as cur:
55+
cur.execute("SELECT max(id) FROM {}_ways".format(prefix))
56+
osmid = cur.fetchone()[0] if cur.rowcount == 1 else None
57+
58+
if osmid is None:
59+
LOG.fatal("No data found in the database.")
60+
return None
61+
62+
LOG.debug("Using way id %d for timestamp lookup", osmid)
63+
# Get the node from the API to find the timestamp when it was created.
64+
url = 'https://www.openstreetmap.org/api/0.6/way/{}/1'.format(osmid)
65+
headers = {"User-Agent" : "osm2pgsql-update",
66+
"Accept" : "application/json"}
67+
with urlrequest.urlopen(urlrequest.Request(url, headers=headers)) as response:
68+
data = json.loads(response.read().decode('utf-8'))
69+
70+
if not data.get('elements') or not 'timestamp' in data['elements'][0]:
71+
LOG.fatal("The way data downloaded from the API does not contain valid data.\n"
72+
"URL used: %s", node_url)
73+
return None
74+
75+
date = data['elements'][0]['timestamp']
76+
LOG.debug("Found timestamp %s", date)
77+
78+
try:
79+
date = dt.datetime.fromisoformat(date.replace('Z', '+00:00'))
80+
except ValueError:
81+
LOG.fatal("Cannot parse timestamp '%s'", date)
82+
return None
83+
84+
return date.replace(tzinfo=dt.timezone.utc)
85+
86+
87+
def setup_replication_state(conn, table, base_url, seq, date):
88+
""" (Re)create the table for the replication state and fill it with
89+
the given state.
90+
"""
91+
with conn.cursor() as cur:
92+
cur.execute('DROP TABLE IF EXISTS "{}"'.format(table))
93+
cur.execute("""CREATE TABLE "{}"
94+
(url TEXT,
95+
sequence INTEGER,
96+
importdate TIMESTAMP WITH TIME ZONE)
97+
""".format(table))
98+
cur.execute('INSERT INTO "{}" VALUES(%s, %s, %s)'.format(table),
99+
(base_url, seq, date))
100+
conn.commit()
101+
102+
103+
def update_replication_state(conn, table, seq, date):
104+
""" Update sequence and date in the replication state table.
105+
The table is assumed to exist.
106+
"""
107+
with conn.cursor() as cur:
108+
if date is not None:
109+
cur.execute('UPDATE "{}" SET sequence=%s, importdate=%s'.format(table),
110+
(seq, date))
111+
else:
112+
cur.execute('UPDATE "{}" SET sequence=%s'.format(table),
113+
(seq,))
114+
115+
conn.commit()
116+
117+
118+
def init(conn, args):
119+
"""\
120+
There are two ways to initialise the replication process: if you have imported
121+
from a file that contains replication source information, then the
122+
initialisation process can use this and set up replication from there.
123+
Use the command '%(prog)s --osm-file <filename>' for this.
124+
125+
If the file has no replication information or you don't have the initial
126+
import file anymore then replication can be set up according to
127+
the data found in the database. It checks the planet_osm_way table for the
128+
newest way in the database and then queries the OSM API when the way was
129+
created. The date is used as the start date for replication. In this mode
130+
the minutely diffs from the OSM servers are used as a source. You can change
131+
this with the '--server' parameter.
132+
"""
133+
if args.osm_file is None:
134+
date = compute_database_date(conn, args.prefix)
135+
if date is None:
136+
return 1
137+
138+
date = date - dt.timedelta(hours=3)
139+
base_url = args.server
140+
seq = None
141+
else:
142+
base_url, seq, date = get_replication_header(args.osm_file)
143+
if base_url is None or (seq is None and date is None):
144+
LOG.fatal("File '%s' has no usable replication headers. Use '--server' instead.")
145+
return 1
146+
147+
repl = ReplicationServer(base_url)
148+
if seq is None:
149+
seq = repl.timestamp_to_sequence(date)
150+
151+
if seq is None:
152+
LOG.fatal("Cannot reach the configured replication service '%s'.\n"
153+
"Does the URL point to a directory containing OSM update data?",
154+
base_url)
155+
return 1
156+
157+
if date is None:
158+
state = repl.get_state_info(seq)
159+
if state is None:
160+
LOG.fatal("Cannot reach the configured replication service '%s'.\n"
161+
"Does the URL point to a directory containing OSM update data?",
162+
base_url)
163+
164+
setup_replication_state(conn, args.table, base_url, seq, date)
165+
166+
LOG.info("Initialised updates for service '%s'.", base_url)
167+
LOG.info("Starting at sequence %d (%s).", seq, date)
168+
169+
return 0
170+
171+
def update(conn, args):
172+
"""\
173+
Download newly available data and apply it to the database.
174+
175+
The data is downloaded in chunks of '--max-diff-size' MB. Each chunk is
176+
saved in a temporary file and imported with osm2pgsql from there. The
177+
temporary file is normally deleted afterwards unless you state an explicit
178+
location with '--diff-file'. Once the database is up to date with the
179+
replication source, the update process exits with 0.
180+
181+
Any additional arguments to osm2pgsql need to be given after '--'. Database
182+
and the prefix parameter are handed through to osm2pgsql. They do not need
183+
to be repeated. '--append' and '--slim' will always be added as well.
184+
"""
185+
with conn.cursor() as cur:
186+
cur.execute('SELECT * FROM pg_tables where tablename = %s', (args.table, ))
187+
if cur.rowcount < 1:
188+
LOG.fatal("Cannot find replication status table. "
189+
"Run 'osm2pgsql-replication init' first.")
190+
return 1
191+
192+
cur.execute('SELECT * FROM "{}"'.format(args.table))
193+
if cur.rowcount != 1:
194+
LOG.fatal("Updates not set up correctly. Run 'osm2pgsql-updates init' first.")
195+
return 1
196+
197+
base_url, seq, ts = cur.fetchone()
198+
LOG.info("Using replication service '%s'. Current sequence %d (%s).",
199+
base_url, seq, ts)
200+
201+
repl = ReplicationServer(base_url)
202+
current = repl.get_state_info()
203+
204+
if seq >= current.sequence:
205+
LOG.info("Database already up-to-date.")
206+
return 0
207+
208+
if args.diff_file is not None:
209+
outfile = Path(args.diff_file)
210+
else:
211+
tmpdir = tempfile.TemporaryDirectory()
212+
outfile = Path(tmpdir.name) / 'osm2pgsql_diff.osc.gz'
213+
214+
osm2pgsql = [args.osm2pgsql_cmd, '--append', '--slim', '--prefix', args.prefix]
215+
osm2pgsql.extend(args.extra_params)
216+
if args.database:
217+
osm2pgsql.extend(('-d', args.database))
218+
if args.username:
219+
osm2pgsql.extend(('-U', args.username))
220+
if args.host:
221+
osm2pgsql.extend(('-H', args.host))
222+
if args.port:
223+
osm2pgsql.extend(('-P', args.port))
224+
osm2pgsql.append(str(outfile))
225+
LOG.debug("Calling osm2pgsql with: %s", ' '.join(osm2pgsql))
226+
227+
while seq < current.sequence:
228+
LOG.debug("Importing from sequence %d", seq)
229+
if outfile.exists():
230+
outfile.unlink()
231+
outhandler = WriteHandler(str(outfile))
232+
endseq = repl.apply_diffs(outhandler, seq + 1,
233+
max_size=args.max_diff_size * 1024)
234+
outhandler.close()
235+
236+
if endseq is None:
237+
LOG.debug("No new diffs found.")
238+
break
239+
240+
subprocess.run(osm2pgsql, check=True)
241+
seq = endseq
242+
243+
nextstate = repl.get_state_info(seq)
244+
update_replication_state(conn, args.table, seq,
245+
nextstate.timestamp if nextstate else None)
246+
247+
if nextstate is not None:
248+
LOG.info("Data imported until %s. Backlog remaining: %s",
249+
nextstate.timestamp,
250+
dt.datetime.now(dt.timezone.utc) - nextstate.timestamp)
251+
252+
if args.once:
253+
break
254+
255+
return 0
256+
257+
def get_args():
258+
parser = ArgumentParser(description=__doc__,
259+
formatter_class=RawDescriptionHelpFormatter)
260+
subs = parser.add_subparsers(title='available commands', dest='subcommand')
261+
262+
default_args = ArgumentParser(add_help=False)
263+
group = default_args.add_argument_group('Default arguments')
264+
group.add_argument('-h', '--help', action='help',
265+
help='Show this help message and exit')
266+
group.add_argument('-q', '--quiet', action='store_const', const=0,
267+
dest='verbose', default=2,
268+
help='Print only error messages')
269+
group.add_argument('-v', '--verbose', action='count', default=2,
270+
help='Increase verboseness of output')
271+
group = default_args.add_argument_group('Database arguments')
272+
group.add_argument('-d', '--database', metavar='DB',
273+
help='Name of PostgreSQL database to connect to or conninfo string')
274+
group.add_argument('-U', '--username', metavar='NAME',
275+
help='PostgreSQL user name')
276+
group.add_argument('-H', '--host', metavar='HOST',
277+
help='Database server host name or socket location')
278+
group.add_argument('-P', '--port', metavar='PORT',
279+
help='Database server port')
280+
group.add_argument('--prefix', metavar='PREFIX', default='planet_osm',
281+
help="Prefix for table names (default 'planet_osm')")
282+
283+
# Arguments for init
284+
cmd = subs.add_parser('init', parents=[default_args],
285+
help=init.__doc__.split('\n', 1)[0],
286+
description=init.__doc__,
287+
formatter_class=RawDescriptionHelpFormatter,
288+
add_help=False)
289+
grp = cmd.add_argument_group('Replication source arguments')
290+
srcgrp = grp.add_mutually_exclusive_group()
291+
srcgrp.add_argument('--osm-file', metavar='FILE',
292+
help='Get replication information from the given file.')
293+
srcgrp.add_argument('--server', metavar='URL',
294+
default='https://planet.openstreetmap.org/replication/minute',
295+
help='Use replication server at the given URL (default: %(default)s)')
296+
cmd.set_defaults(handler=init)
297+
298+
# Arguments for update
299+
cmd = subs.add_parser('update', parents=[default_args],
300+
usage='%(prog)s update [options] [-- param [param ...]]',
301+
help=update.__doc__.split('\n', 1)[0],
302+
description=update.__doc__,
303+
formatter_class=RawDescriptionHelpFormatter,
304+
add_help=False)
305+
cmd.set_defaults(handler=update)
306+
cmd.add_argument('extra_params', nargs='*', metavar='param',
307+
help='Extra parameters to hand in to osm2pgsql.')
308+
grp = cmd.add_argument_group('Update process arguments')
309+
cmd.add_argument('--diff-file', metavar='FILE',
310+
help='File to save changes before they are applied to osm2pgsql.')
311+
cmd.add_argument('--max-diff-size', type=int, default=500,
312+
help='Maximum data to load in MB (default: 500MB)')
313+
cmd.add_argument('--osm2pgsql-cmd', default='osm2pgsql',
314+
help='Path to osm2pgsql command (default: osm2pgsql)')
315+
cmd.add_argument('--once', action='store_true',
316+
help='Run updates only once, even when more data is available.')
317+
318+
args = parser.parse_args()
319+
320+
if args.subcommand is None:
321+
parser.print_help()
322+
exit(1)
323+
324+
return args
325+
326+
327+
def main():
328+
args = get_args()
329+
330+
logging.basicConfig(stream=sys.stderr,
331+
format='{asctime} [{levelname}]: {message}',
332+
style='{',
333+
datefmt='%Y-%m-%d %H:%M:%S',
334+
level=max(4 - args.verbose, 1) * 10)
335+
336+
if '"' in args.prefix:
337+
LOG.fatal("Prefix must not contain quotation marks.")
338+
return 1
339+
340+
args.table = '{}_replication_status'.format(args.prefix)
341+
342+
conn = connect(args)
343+
ret = args.handler(conn, args)
344+
conn.close()
345+
346+
return ret
347+
348+
349+
if __name__ == '__main__':
350+
exit(main())

0 commit comments

Comments
 (0)