Skip to content

Commit a87d3b9

Browse files
authored
Support dependent layer code via "requires" param (#205)
This will resolve openmaptiles/openmaptiles#796 The code will produce parallel or linear SQL file(s) taking into account the `requires` layer parameter. If layer A is required by B and C, it will produce one joined ABC file, while allowing other layers to execute in parallel. * A few comments were added to the output. * Another minor change -- layer object stores its index (position) within the tileset
1 parent 3aefb24 commit a87d3b9

File tree

14 files changed

+290
-28
lines changed

14 files changed

+290
-28
lines changed

bin/debug-mvt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,8 @@ async def main(args):
116116
)
117117
layer_sql = mvt.generate_layer(layer_def)
118118
if verbose:
119-
print(f"\n======= Querying layer {layer_id} =======\n{query.strip()}\n"
120-
f"== MVT SQL\n{layer_sql}")
119+
print(f"\n======= Querying layer {layer_id} (#{layer_def.index}) =======\n"
120+
f"{query.strip()}\n== MVT SQL\n{layer_sql}")
121121

122122
# Re-order columns - move osm_id and geometry fields to the right of the table
123123
names = {

bin/generate-sql

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,6 @@ if __name__ == '__main__':
4646

4747
(path / 'run_first.sql').write_text(run_first)
4848
(path / 'run_last.sql').write_text(run_last)
49-
for idx, sql in enumerate(parallel_sql):
50-
(parallel_dir / f'{idx:03}.sql').write_text(sql)
49+
for file, sql in parallel_sql.items():
50+
(parallel_dir / f'{file}.sql').write_text(sql)
5151
print(f'Created {len(parallel_sql)} sql files for parallel execution at {path}')

openmaptiles/sql.py

Lines changed: 68 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,87 @@
11
import re
2-
from typing import Union
2+
from typing import Union, Dict, Tuple
33

44
from sys import stderr
55

66
from openmaptiles.tileset import Tileset, Layer
77

88

9-
def collect_sql(tileset_filename, parallel=False, nodata=False):
10-
"""If parallel is True, returns a sql value that must be executed first,
11-
and a lists of sql values that can be ran in parallel.
9+
def collect_sql(tileset_filename, parallel=False, nodata=False
10+
) -> Union[str, Tuple[str, Dict[str, str], str]]:
11+
"""If parallel is True, returns a sql value that must be executed first, last,
12+
and a dict of names -> sql code that can be ran in parallel.
1213
If parallel is False, returns a single sql string.
1314
nodata=True replaces all "/* DELAY_MATERIALIZED_VIEW_CREATION */"
1415
with the "WITH NO DATA" SQL."""
15-
tileset = Tileset.parse(tileset_filename)
16+
tileset = Tileset(tileset_filename)
17+
18+
run_first = "-- This SQL code should be executed first\n\n" + \
19+
get_slice_language_tags(tileset.languages)
20+
# at this point we don't have any SQL to run at the end
21+
run_last = "-- This SQL code should be executed last\n"
22+
23+
# resolved is a map of layer ID to some ID in results.
24+
# the ID in results could be the same as layer ID, or it could be a tuple of IDs
25+
resolved = {}
26+
# results is an ID -> SQL content map
27+
results = {}
28+
unresolved = tileset.layers_by_id.copy()
29+
last_count = -1
30+
# safety to prevent infinite loop, even though it is also checked in tileset
31+
while len(resolved) > last_count:
32+
last_count = len(resolved)
33+
for lid, layer in list(unresolved.items()):
34+
if all((v in resolved for v in layer.requires)):
35+
# All requirements have been resolved.
36+
resolved[lid] = lid
37+
results[lid] = layer_to_sql(layer, nodata)
38+
del unresolved[lid]
39+
40+
if layer.requires:
41+
# If there are more than one requirement, merge them first,
42+
# e.g. if there are layers A, B, and C; and C requires A & B,
43+
# first concatenate A and B, and then append C to them.
44+
# Make sure the same code is not merged multiple times
45+
mix = list(layer.requires) + [lid]
46+
lid1 = mix[0]
47+
for idx in range(1, len(mix)):
48+
lid2 = mix[idx]
49+
res_id1 = resolved[lid1]
50+
res_id2 = resolved[lid2]
51+
if res_id1 == res_id2:
52+
continue
53+
merged_id = res_id1 + "__" + res_id2
54+
if merged_id in results:
55+
raise ValueError(f"Naming collision - {merged_id} exists")
56+
# NOTE: merging will move entity to the end of the list
57+
results[merged_id] = results[res_id1] + "\n" + results[res_id2]
58+
del results[res_id1]
59+
del results[res_id2]
60+
# Update resolved IDs to point to the merged result
61+
for k, v in resolved.items():
62+
if v == res_id1 or v == res_id2:
63+
resolved[k] = merged_id
64+
if unresolved:
65+
raise ValueError(f"Circular dependency found in layer requirements: " +
66+
', '.join(unresolved.keys()))
67+
68+
if not parallel:
69+
sql = '\n'.join(results.values())
70+
return f"{run_first}\n{sql}\n{run_last}"
71+
else:
72+
return run_first, results, run_last
1673

17-
run_first = get_slice_language_tags(tileset.languages)
18-
run_last = '' # at this point we don't have any SQL to run at the end
1974

20-
parallel_sql = []
21-
for layer in tileset.layers:
22-
schemas = '\n\n'.join((to_sql(v, layer, nodata) for v in layer.schemas))
23-
parallel_sql.append(f"""\
75+
def layer_to_sql(layer: Layer, nodata: bool):
76+
schemas = '\n\n'.join((to_sql(v, layer, nodata) for v in layer.schemas))
77+
sql = f"""\
2478
DO $$ BEGIN RAISE NOTICE 'Processing layer {layer.id}'; END$$;
2579
2680
{schemas}
2781
2882
DO $$ BEGIN RAISE NOTICE 'Finished layer {layer.id}'; END$$;
29-
""")
30-
31-
if parallel:
32-
return run_first, parallel_sql, run_last
33-
else:
34-
return run_first + '\n'.join(parallel_sql) + run_last
83+
"""
84+
return sql.strip() + "\n"
3585

3686

3787
def get_slice_language_tags(languages):
@@ -143,7 +193,7 @@ def sql_value(value):
143193
return "E'" + value.replace('\\', '\\\\').replace("'", "\\'") + "'"
144194

145195

146-
def to_sql(sql, layer, nodata):
196+
def to_sql(sql: str, layer: Layer, nodata: bool):
147197
"""Clean up SQL, and perform any needed code injections"""
148198
sql = sql.strip()
149199

openmaptiles/tileset.py

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def tag_fields_to_sql(fields):
1717

1818
@dataclass
1919
class ParsedData:
20-
data: dict
20+
data: Union[dict, str]
2121
path: Path
2222

2323

@@ -67,8 +67,15 @@ def parse(layer_source: Union[str, Path, ParsedData]) -> 'Layer':
6767

6868
def __init__(self,
6969
layer_source: Union[str, Path, ParsedData],
70-
tileset: 'Tileset' = None):
70+
tileset: 'Tileset' = None,
71+
index: int = None):
72+
"""
73+
:param layer_source: load layer from this source, e.g. a file
74+
:param tileset: parent tileset object (optional)
75+
:param index: layer's position index within the tileset
76+
"""
7177
self.tileset = tileset
78+
self.index = index
7279

7380
if isinstance(layer_source, ParsedData):
7481
self.filename = layer_source.path
@@ -87,12 +94,29 @@ def __init__(self,
8794

8895
self.imposm_mappings = [parse_file(f) for f in self.imposm_mapping_files]
8996

90-
self.schemas = [Path(layer_dir, f).read_text('utf-8')
91-
for f in self.definition.get('schema', [])]
97+
schemas = [
98+
(f.path, f.data) if isinstance(f, ParsedData)
99+
else (f, Path(layer_dir, f).read_text('utf-8'))
100+
for f in self.definition.get('schema', [])
101+
]
102+
self.schemas = [f"-- Layer {self.id} - {p}\n\n{d}" for p, d in schemas]
92103

93104
self.fields = [Field(k, v) for k, v in
94105
self.definition['layer']['fields'].items()]
95106

107+
if 'requires' in self.definition['layer']:
108+
self.requires = self.definition['layer']['requires']
109+
if isinstance(self.requires, str):
110+
self.requires = [self.requires]
111+
if (
112+
not isinstance(self.requires, list) or
113+
any(not isinstance(v, str) or v == "" for v in self.requires)
114+
):
115+
raise ValueError("If defined, 'requires' parameter must be the ID "
116+
"of another layer, or a list of layer IDs")
117+
else:
118+
self.requires = []
119+
96120
validate_properties(self, f"Layer {self.filename}")
97121

98122
if any(v.name == self.geometry_field for v in self.fields):
@@ -219,8 +243,35 @@ def __init__(self, tileset_source: Union[str, Path, ParsedData]):
219243

220244
self.definition = self.definition['tileset']
221245
self.layers = []
222-
for layer_filename in self.definition['layers']:
223-
self.layers.append(Layer(layer_filename, self))
246+
self.layers_by_id = {}
247+
for index, layer_filename in enumerate(self.definition['layers']):
248+
layer = Layer(layer_filename, self, index)
249+
if layer.id in self.layers_by_id:
250+
raise ValueError(f"Layer '{layer.id}' is defined more than once")
251+
self.layers.append(layer)
252+
self.layers_by_id[layer.id] = layer
253+
254+
# Detect circular dependencies and missing layer IDs for the 'requires' field
255+
resolved = set()
256+
unresolved = self.layers_by_id.copy()
257+
last_count = -1
258+
while len(resolved) > last_count:
259+
last_count = len(resolved)
260+
for lid, layer in list(unresolved.items()):
261+
for req in layer.requires:
262+
if req not in self.layers_by_id:
263+
raise ValueError(f"Unknown layer '{req}' required for "
264+
f"layer {layer.id}")
265+
if req not in resolved:
266+
break
267+
else:
268+
# all requirements are already in resolved (or no reqs)
269+
resolved.add(lid)
270+
del unresolved[lid]
271+
if unresolved:
272+
raise ValueError(f"Circular dependency found in layer requirements: " +
273+
', '.join(unresolved.keys()))
274+
224275
validate_properties(self, f"Tileset {self.filename}")
225276

226277
@deprecated(version='3.2.0', reason='use named properties instead')

tests/expected/parallel_sql/parallel/001.sql renamed to tests/expected/parallel_sql/parallel/enumfield.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
DO $$ BEGIN RAISE NOTICE 'Processing layer enumfield'; END$$;
22

3+
-- Layer enumfield - ./enumfield.sql
4+
35
CREATE OR REPLACE FUNCTION map_landuse_class("natural" VARCHAR, landuse VARCHAR) RETURNS TEXT AS $$
46
SELECT CASE
57
WHEN "natural" = 'bare_rock' THEN 'rock'

tests/expected/parallel_sql/parallel/000.sql renamed to tests/expected/parallel_sql/parallel/housenumber.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
DO $$ BEGIN RAISE NOTICE 'Processing layer housenumber'; END$$;
22

3+
-- Layer housenumber - ./housenumber_centroid.sql
4+
5+
36
-- etldoc: osm_housenumber_point -> osm_housenumber_point
47
UPDATE osm_housenumber_point SET geometry=topoint(geometry)
58
WHERE ST_GeometryType(geometry) <> 'ST_Point';
69

10+
-- Layer housenumber - ./layer.sql
11+
12+
713
-- etldoc: layer_housenumber[shape=record fillcolor=lightpink, style="rounded,filled",
814
-- etldoc: label="layer_housenumber | <z14_> z14_" ] ;
915

tests/expected/parallel_sql/run_first.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
-- This SQL code should be executed first
2+
13
CREATE OR REPLACE FUNCTION slice_language_tags(tags hstore)
24
RETURNS hstore AS $$
35
SELECT delete_empty_keys(slice(tags, ARRAY['name:en', 'name:de', 'name:cs', 'int_name', 'loc_name', 'name', 'wikidata', 'wikipedia']))
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
-- This SQL code should be executed last

tests/expected/parallel_sql2/parallel/001.sql renamed to tests/expected/parallel_sql2/parallel/enumfield.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
DO $$ BEGIN RAISE NOTICE 'Processing layer enumfield'; END$$;
22

3+
-- Layer enumfield - ./enumfield.sql
4+
35
CREATE OR REPLACE FUNCTION map_landuse_class("natural" VARCHAR, landuse VARCHAR) RETURNS TEXT AS $$
46
SELECT CASE
57
WHEN "natural" = 'bare_rock' THEN 'rock'

tests/expected/parallel_sql2/parallel/000.sql renamed to tests/expected/parallel_sql2/parallel/housenumber.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
DO $$ BEGIN RAISE NOTICE 'Processing layer housenumber'; END$$;
22

3+
-- Layer housenumber - ./housenumber_centroid.sql
4+
5+
36
-- etldoc: osm_housenumber_point -> osm_housenumber_point
47
UPDATE osm_housenumber_point SET geometry=topoint(geometry)
58
WHERE ST_GeometryType(geometry) <> 'ST_Point';
69

10+
-- Layer housenumber - ./layer.sql
11+
12+
713
-- etldoc: layer_housenumber[shape=record fillcolor=lightpink, style="rounded,filled",
814
-- etldoc: label="layer_housenumber | <z14_> z14_" ] ;
915

0 commit comments

Comments
 (0)