diff --git a/CHANGELOG.md b/CHANGELOG.md index 2edc308..d5fc080 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,16 @@ # Change log +### 0.3.0 +- Update converters to emit OSW 0.3 schema id and support new vegetation features (trees, tree rows, woods). +- Extend OSW normalizers to keep `leaf_cycle` and `leaf_type` where allowed for points, lines, and polygons. +- Add unit coverage for OSW 0.3 natural feature handling. +- Expand OSM normalizer coverage and robustness: preserve non-compliant/unknown tags as `ext:*`, canonicalize JSON ext values, normalize elevation from 3D geometries, tolerate string IDs, and harden edge-case handling with tests. +- OSW→OSM improvements: promote invalid/unknown fields (incl. dict/list) to `ext:*`, set `version="1"` for visible elements, derive `ext:elevation` from Z coords, and keep invalid incline/climb values under `ext:` instead of dropping them. +- OSM→OSW improvements: verify OSW 0.3 `$schema` headers, export tree/tree_row/wood features, treat `ext:` tags as valid identifiers in OSW normalizers for filtering, and add multi-exterior handling tests for zones/polygons plus line parsing guards. +- Added extensive unit tests for osm/osw normalizers and graph serializers (filters, geojson import/export, zebra crossing mapping, kerb/foot validators, invalid line/polygon/zone branches, ref normalization, etc.). +- Added fixtures for vegetation and 3D elevation scenarios (`tree-test.xml`) and custom-property round-trip checks. +- Implemented collision-free ID handling: sequential remapping of nodes/ways/relations on OSW→OSM export with reference rewrites, plus tests confirming sequential IDs and schema/tag updates. + ### 0.2.13 - Added default `version="1"` attribute to all nodes, ways, and relations generated during OSW→OSM conversion. - Introduced unit test coverage to verify version attributes are written for all OSM elements. diff --git a/docs/TESTING_OVERVIEW.md b/docs/TESTING_OVERVIEW.md new file mode 100644 index 0000000..0e235b3 --- /dev/null +++ b/docs/TESTING_OVERVIEW.md @@ -0,0 +1,35 @@ +Below is a breakdown of all test scenarios (`def test_` and `async def test_`) across the suite and the main areas they cover. + +Total scenarios: 211 +Code coverage: 98% (`coverage report`) + +Test module | Scenarios | Focus / scenarios covered (key checks) +--- | --- | --- +tests/unit_tests/helpers/test_osm.py | 11 | Way/node/point/line/zone/polygon counters; entity counter dispatch; OSMGraph creation and simplification/geometry construction lifecycles; filter helpers return booleans for tagged inputs +tests/unit_tests/helpers/test_osw.py | 22 | OSW filters (all geometries); unzip/merge optional files & missing-file handling; simplify/construct graph end-to-end; per-entity counters; ext tag retention; temp-file cleanup; optional files presence/absence; merge deletes intermediates +tests/unit_tests/helpers/test_response.py | 6 | Response dataclass defaults, attribute access, repr/str formatting, mutation for lists/strings/error payloads +tests/unit_tests/test_formatter.py | 7 | Formatter success/error paths; converter delegation; cleanup lifecycle; exception surfacing; mocking converter calls; workdir creation/idempotence; cleanup of existing vs missing files +tests/unit_tests/test_osm2osw/test_osm2osw.py | 15 | OSM→OSW: file counts/types; width/incline validation/cleaning; `$schema`=0.3 headers; ext-tag passthrough; point geometry enforcement; duplicate vs unique IDs; schema header verification; bad-input path; type assertions; tree/tree_row/wood export coverage; invalid-node-tag skip logic +tests/unit_tests/test_osm_compliance/test_osm_compliance.py | 2 | OSW validation of OSW→OSM→OSW roundtrip; incline tag preservation using official validator +tests/unit_tests/test_osw2osm/test_osw2osm.py | 16 | OSW→OSM: version attrs (visible/non-visible); incline/climb handling; invalid incline to `ext:`; custom dict/list props to `ext:`; 3D elevation to `ext:elevation`; missing-zip error; invalid width to `ext:`; XML output type assertions; ext tags on invalid properties; climb suppression when incline present; sequential ID remap and ref rewrite assertions +tests/unit_tests/test_roundtrip/test_roundtrip.py | 2 | Full roundtrip (OSW zip → OSM XML → OSW → OSM) smoke checks, ID preservation, schema continuity, ext:* tag parity for both OSW-zip and raw-OSM starting points +tests/unit_tests/test_serializer/test_osm_graph.py | 37 | Parsers (ways/nodes/points/lines/zones/polygons) incl. invalid locations & multi-exteriors; tagged node parsing; simplify/construct geometries; to_geojson ID/export rules; to_undirected variants; filter_edges node-copy; from_geojson import/export; point ID prefix trimming; progress callbacks; invalid location skip logic; duplicate-id protection; empty-graph exports +tests/unit_tests/test_serializer/test_osm_normalizer.py | 19 | OSM normalizer tag filtering: datatype coercion/NaN removal; incline/climb/foot handling; ext tag retention; zebra crossing mapping; kerb/foot validators; width/incline edge cases; implied foot removal; `_id` sourcing when tags absent/empty +tests/unit_tests/test_serializer/test_osm_osm_normalizer.py | 11 | OSM normalizer edge cases: `_stash_ext` JSON canonicalization/errors/unknown keys; dict/list promotion to `ext:`; zone area tagging; elevation extraction fallbacks; ID normalization across nodes/ways/relations and refs/nodeRefs/refs; ref write-back branches; non-numeric ID tolerance; canonical `ext:` serialization +tests/unit_tests/test_serializer/test_osw_normalizer.py | 63 | OSW normalizer: filters/normalizers for all feature types; tree/tree_row/wood support; leaf_cycle/leaf_type validation; crossing markings (incl. zebra inference); kerb/foot/surface validators; invalid branches raising; keep_key/default behaviors; width/incline/climb handling; ext tag passthrough and ext-based filter classification; literal keep-key handling; natural-* guards; tactile paving/surface normalization + +Detailed scenario highlights (what we explicitly exercise) +- tests/unit_tests/helpers/test_osm.py: async counters on `wa.microsoft.osm.pbf` confirm expected counts for ways/points/nodes; `get_osm_graph` builds an `OSMGraph` then `simplify_og`/`construct_geometries` run without mutating return types; way/node/point/zone/polygon filters accept tagged inputs and return booleans. +- tests/unit_tests/helpers/test_osw.py: counts for ways/nodes/points/zones/lines/polygons across the same PBF; unzip returns the expected nodes/edges/points artifacts and gracefully returns empty dict when files are missing; merge combines multiple GeoJSON FeatureCollections and deletes temp inputs; zone/polygon filters assert boolean output; temp cleanup covers both existing and already-removed files. +- tests/unit_tests/helpers/test_response.py: default `Response` has `status=True` with `None` files/error; supports list or string `generated_files`; preserves custom error messages and `None` errors in success cases. +- tests/unit_tests/test_formatter.py: `Formatter.osm2osw` happy/failed paths surface `Response.status`; workdir is created idempotently whether or not it exists; cleanup removes tracked files and ignores missing ones; `Formatter.osw2osm` delegates to `OSW2OSM.convert` exactly once (mocked) and propagates its response. +- tests/unit_tests/test_osm2osw/test_osm2osw.py: end-to-end conversion yields six outputs (nodes/points/edges/zones/polygons/lines) with string paths; GeoJSONs contain non-empty geometries with string `_id`s and no duplicates; width tags are numeric, incline tags remain numeric on edges, and invalid node tags lead to no files; `$schema` header equals 0.3 and carries through tree/tree_row/wood fixtures; ext:* properties are preserved; file naming matches expected entity types; failure path returns `status=False`. +- tests/unit_tests/test_osm_compliance/test_osm_compliance.py: runs OSW→OSM→OSW through `python_osw_validation` to assert zero validation issues; checks that incline tags survive the full round-trip. +- tests/unit_tests/test_osw2osm/test_osw2osm.py: converts OSW ZIPs to a single OSM XML, ensuring width tags are present and numeric; error path when ZIP is missing; incline tags are present but climb tags are stripped or shifted to `ext:incline` for invalid values; custom/non-compliant properties (dict/list) are promoted to ext:* JSON; 3D node coordinates emit `ext:elevation`; `_ensure_version_attribute` backfills version on visible elements; sequential ID remap rewrites ids/refs; all generated paths are strings and end with `.xml`. +- tests/unit_tests/test_roundtrip/test_roundtrip.py: two smoke flows keep ext:* tags intact—(1) OSW ZIP → OSM XML → OSW → OSM, (2) raw OSM XML → OSW → OSM—comparing node/way ext:* sets for equality. +- tests/unit_tests/test_serializer/test_osm_graph.py: graph metadata (directed/multigraph) and undirected copies retain node attrs; parsers handle missing nodes/invalid coordinates and multi-exterior polygons/zones; tagged-node parser only ingests OSW nodes; simplify/construct geometries rebuild missing geometries for points/lines with node refs; `to_geojson` preserves IDs, trims point prefixes, handles empty graphs, exports progress callbacks; `from_geojson` ingests features and respects mapping hooks and filter functions. +- tests/unit_tests/test_serializer/test_osm_normalizer.py: width/incline/climb coercion removes NaN/invalid strings, retains valid ints/floats; climb removal rules when incline present, except steps keep climb/down; ext_osm_id assignment prefers tags but falls back to internal IDs and skips empty values; implied foot tags dropped where inappropriate. +- tests/unit_tests/test_serializer/test_osm_osm_normalizer.py: `_stash_ext` normalizes JSON strings, skips None, and serializes unknown structures; filter_tags moves unknown keys or invalid datatypes to ext:* and adds area tags for zones; elevation extraction rejects NaN, falling back through z/ele tags; ID normalization covers negative IDs and writes back refs/nodeRefs/refs variants. +- tests/unit_tests/test_serializer/test_osw_normalizer.py: validators classify sidewalks/crossings/traffic islands/stairs/living streets/powerpoles/trees/tree_rows/wood; invalid geometries raise where expected; stair normalization keeps/drops climb per validity and defaults highway/foot; width/incline/climb handling mirrors OSM normalizer; crossing markings inferred from zebra tags; keep_key/default behaviors honored; tactile paving/surface/leaf_cycle/leaf_type/kerb/foot rules validated; natural-* checks drop invalid feature types. + +Method: counted functions matching `def test_` and `async def test_` under `tests/` and grouped scenario themes per file. diff --git a/docs/id_remapping.md b/docs/id_remapping.md new file mode 100644 index 0000000..0a0298e --- /dev/null +++ b/docs/id_remapping.md @@ -0,0 +1,72 @@ +# ID Remapping (OSW → OSM) + +This document explains how IDs are generated and remapped when converting OSW (GeoJSON) to OSM XML. + +## Goal +Produce collision-free OSM XML where all node/way/relation IDs are sequential per type (starting at 1) and all references are updated accordingly, while preserving OSW identifiers in `_id` tags. + +## Process +1. **Initial IDs from OSW content** + - Nodes/points/lines/zones/polygons parsed from OSW GeoJSON enter the OSM graph with their OSW `_id`/references. + - Extension/unknown properties are preserved under `ext:*`; elevation from 3D coordinates becomes `ext:elevation`. + +2. **OSW→OSM export** + - `OSW2OSM.convert()` runs the normal ogr2osm pipeline, writing an OSM XML file. + - `_ensure_version_attribute` ensures all elements have `version="1"` (visible elements get it if missing). + +3. **Sequential remap** + - After the XML is written, `_remap_ids_to_sequential` rewrites IDs and references: + - Nodes are renumbered `1..N` in document order; their `_id` tags are updated to the new ID. + - Ways are renumbered `1..M`; their `_id` tags are updated. All `` values are rewritten to the new node IDs. + - Relations are renumbered `1..K`; their `_id` tags are updated. All `` values are rewritten based on member `type` (node/way/relation) using the new ID maps. + - The remap runs in-place on the XML so the final output has consistent, collision-free IDs and references. + +4. **What remains** + - Original OSW identifiers survive in other tags (e.g., `ext:osm_id` if provided, other `ext:*`), but `_id` reflects the new sequential OSM ID. + +## Notes / rationale +- The remap ensures deterministic, collision-free IDs regardless of source naming schemes (e.g., OSW prefixes, extension data). +- Reference integrity is maintained by rewriting all node refs in ways and member refs in relations. +- Version attributes are normalized before remapping to satisfy OSM validators expecting `version`. + +## Minimal example (what the remap does) +Input XML (simplified): +```xml + + + + + + + + + + + + + +``` + +After `_remap_ids_to_sequential`: +```xml + + + + + + + + + + + + + +``` +All IDs now start at 1 per type, and every reference points to the remapped IDs. + +## Relevant code +- Entry point: `OSW2OSM.convert()` (`src/osm_osw_reformatter/osw2osm/osw2osm.py`) + - Calls `_ensure_version_attribute` + - Calls `_remap_ids_to_sequential` +- Remap implementation: `_remap_ids_to_sequential` in `osw2osm.py` rewrites element IDs and their refs in-place and updates `_id` tags. diff --git a/requirements.txt b/requirements.txt index a433c7a..b000760 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,4 @@ shapely~=2.0.2 pyproj~=3.6.1 coverage~=7.5.1 ogr2osm==1.2.0 -python-osw-validation==0.2.15 \ No newline at end of file +python-osw-validation==0.3.1 \ No newline at end of file diff --git a/src/osm_osw_reformatter/osw2osm/osw2osm.py b/src/osm_osw_reformatter/osw2osm/osw2osm.py index 505164b..2d6e819 100644 --- a/src/osm_osw_reformatter/osw2osm/osw2osm.py +++ b/src/osm_osw_reformatter/osw2osm/osw2osm.py @@ -34,6 +34,7 @@ def convert(self) -> Response: data_writer = ogr2osm.OsmDataWriter(output_file, suppress_empty_tags=True) osm_data.output(data_writer) self._ensure_version_attribute(output_file) + self._remap_ids_to_sequential(output_file) del translation_object del datasource @@ -64,3 +65,51 @@ def _ensure_version_attribute(osm_xml_path: Path) -> None: element.set('version', '1') tree.write(osm_xml_path, encoding='utf-8', xml_declaration=True) + + @staticmethod + def _remap_ids_to_sequential(osm_xml_path: Path) -> None: + """Remap node/way/relation IDs to sequential values starting at 1 and update references.""" + try: + tree = ET.parse(osm_xml_path) + except Exception: + return + + root = tree.getroot() + + def remap_elements(xpath: str): + mapping = {} + elems = root.findall(xpath) + for idx, elem in enumerate(elems, start=1): + old_id = elem.get('id') + if old_id is None: + continue + mapping[old_id] = str(idx) + elem.set('id', str(idx)) + for tag in elem.findall("./tag[@k='_id']"): + tag.set('v', str(idx)) + return mapping + + node_map = remap_elements('.//node') + way_map = remap_elements('.//way') + rel_map = remap_elements('.//relation') + + # Update way nd refs + for way in root.findall('.//way'): + for nd in way.findall('nd'): + ref = nd.get('ref') + if ref in node_map: + nd.set('ref', node_map[ref]) + + # Update relation member refs + for rel in root.findall('.//relation'): + for member in rel.findall('member'): + ref = member.get('ref') + m_type = member.get('type') + if m_type == 'node' and ref in node_map: + member.set('ref', node_map[ref]) + elif m_type == 'way' and ref in way_map: + member.set('ref', way_map[ref]) + elif m_type == 'relation' and ref in rel_map: + member.set('ref', rel_map[ref]) + + tree.write(osm_xml_path, encoding='utf-8', xml_declaration=True) diff --git a/src/osm_osw_reformatter/serializer/osm/osm_graph.py b/src/osm_osw_reformatter/serializer/osm/osm_graph.py index a1335ba..41e7117 100644 --- a/src/osm_osw_reformatter/serializer/osm/osm_graph.py +++ b/src/osm_osw_reformatter/serializer/osm/osm_graph.py @@ -730,3 +730,9 @@ def from_geojson(cls, nodes_path, edges_path): for edge_feature in edges_fc['features']: props = edge_feature['properties'] + u = props.pop('_u_id') + v = props.pop('_v_id') + props['geometry'] = shape(edge_feature['geometry']) + G.add_edges_from([(u, v, props)]) + + return osm_graph diff --git a/src/osm_osw_reformatter/serializer/osm/osm_normalizer.py b/src/osm_osw_reformatter/serializer/osm/osm_normalizer.py index 457d570..4a5114c 100644 --- a/src/osm_osw_reformatter/serializer/osm/osm_normalizer.py +++ b/src/osm_osw_reformatter/serializer/osm/osm_normalizer.py @@ -1,3 +1,5 @@ +import json +import math import ogr2osm class OSMNormalizer(ogr2osm.TranslationBase): @@ -14,6 +16,73 @@ class OSMNormalizer(ogr2osm.TranslationBase): 'step_count': int, } + OSM_ALLOWED_TAGS = { + '_id', + '_u_id', + '_v_id', + '_w_id', + 'area', + 'amenity', + 'barrier', + 'building', + 'climb', + 'crossing:markings', + 'description', + 'emergency', + 'ext:maxspeed', + 'foot', + 'footway', + 'highway', + 'incline', + 'kerb', + 'leaf_cycle', + 'leaf_type', + 'length', + 'man_made', + 'name', + 'natural', + 'opening_hours', + 'power', + 'service', + 'step_count', + 'surface', + 'tactile_paving', + 'width', + } + + def _stash_ext(self, tags, key, value): + """Preserve non-compliant values under an ext: namespace.""" + if value is None: + return + try: + if isinstance(value, (dict, list)): + safe_value = json.dumps(value, separators=(",", ": ")) + elif isinstance(value, str): + # Normalize JSON-like strings to canonical compact form + stripped = value.strip() + if (stripped.startswith("{") and stripped.endswith("}")) or ( + stripped.startswith("[") and stripped.endswith("]") + ): + try: + safe_value = json.dumps(json.loads(stripped), separators=(",", ": ")) + except Exception: + safe_value = value + else: + safe_value = value + else: + safe_value = str(value) + except Exception: + safe_value = str(value) + # Final canonicalization for any JSON-like string + if isinstance(safe_value, str): + s = safe_value.strip() + if (s.startswith("{") and s.endswith("}")) or (s.startswith("[") and s.endswith("]")): + try: + safe_value = json.dumps(json.loads(s), separators=(",", ": ")) + except Exception: + pass + tags[f"ext:{key}"] = safe_value + def _check_datatypes(self, tags): for key, expected_type in self.OSM_TAG_DATATYPES.items(): value = tags.get(key) @@ -21,10 +90,12 @@ def _check_datatypes(self, tags): try: cast_value = expected_type(value) if isinstance(cast_value, float) and (cast_value != cast_value): # NaN check + self._stash_ext(tags, key, value) tags.pop(key) else: tags[key] = str(cast_value) except (ValueError, TypeError): + self._stash_ext(tags, key, value) tags.pop(key) def filter_tags(self, tags): @@ -32,6 +103,17 @@ def filter_tags(self, tags): Override this method if you want to modify or add tags to the xml output ''' + # Promote non-serializable values (e.g., dict/list) to ext: namespace + for key in list(tags.keys()): + value = tags[key] + if isinstance(value, (dict, list)): + self._stash_ext(tags, key, value) + tags.pop(key, None) + elif key not in self.OSM_ALLOWED_TAGS and not key.startswith('ext:'): + # Preserve unknown/non-compliant fields as ext: tags + self._stash_ext(tags, key, value) + tags.pop(key, None) + # Handle zones if 'highway' in tags and tags['highway'] == 'pedestrian' and '_w_id' in tags and tags['_w_id']: tags['area'] = 'yes' @@ -47,13 +129,15 @@ def filter_tags(self, tags): # OSW fields with similar OSM field names if 'climb' in tags: if tags.get('highway') != 'steps' or tags['climb'] not in ('up', 'down'): + self._stash_ext(tags, 'climb', tags.get('climb')) tags.pop('climb', '') if 'incline' in tags: try: incline_val = float(str(tags['incline'])) except (ValueError, TypeError): - # Drop the incline tag if it cannot be interpreted as a float + # Preserve invalid incline as extension + self._stash_ext(tags, 'incline', tags.get('incline')) tags.pop('incline', '') else: # Normalise numeric incline values by casting to string @@ -69,23 +153,87 @@ def process_feature_post(self, osmgeometry, ogrfeature, ogrgeometry): ogr feature and ogr geometry used to create the object are passed as well. Note that any return values will be discarded by ogr2osm. ''' + def _set_tag(osm_obj, key, value): + tags = getattr(osm_obj, "tags", None) + if tags is None: + return + if isinstance(tags.get(key), list): + tags[key] = [value] + elif key in tags: + tags[key] = value + else: + tags[key] = [value] if any(isinstance(v, list) for v in tags.values()) else value + osm_id = None # ext:osm_id is probably in the tags dictionary as 'ext:osm_id' or similar + def _as_int(val): + try: + return int(val) + except (ValueError, TypeError): + return None + if 'ext:osm_id' in osmgeometry.tags and osmgeometry.tags['ext:osm_id'][0]: - osm_id = int(osmgeometry.tags['ext:osm_id'][0]) + osm_id = _as_int(osmgeometry.tags['ext:osm_id'][0]) elif '_id' in osmgeometry.tags and osmgeometry.tags['_id'][0]: - osm_id = int(osmgeometry.tags['_id'][0]) + osm_id = _as_int(osmgeometry.tags['_id'][0]) if osm_id is not None: osmgeometry.id = osm_id + elevation = self._extract_elevation(ogrgeometry) + if elevation is not None: + _set_tag(osmgeometry, "ext:elevation", str(elevation)) + + def _extract_elevation(self, ogrgeometry): + """Return the Z value of the first coordinate, if present and valid.""" + if ogrgeometry is None: + return None + + try: + dim = ogrgeometry.GetCoordinateDimension() + if dim < 3: + return None + except Exception: + return None + + def _first_point(geom): + try: + if geom.GetPointCount() > 0: + return geom.GetPoint(0) + except Exception: + pass + try: + if geom.GetGeometryCount() > 0: + ref = geom.GetGeometryRef(0) + if ref and ref.GetPointCount() > 0: + return ref.GetPoint(0) + except Exception: + pass + return None + + point = _first_point(ogrgeometry) + if not point or len(point) < 3: + return None + + try: + z_val = float(point[2]) + except (ValueError, TypeError): + return None + + if math.isnan(z_val): + return None + + return z_val def process_output(self, osmnodes, osmways, osmrelations): """ - Convert negative IDs into deterministic 63-bit positive IDs - for all nodes, ways, and relations (and their references), - and add a '_id' tag with the new derived positive ID. + Remap all element IDs to sequential, collision-free values per type. + Adds a '_id' tag with the new derived positive ID and rewrites + references accordingly. """ - mask_63bit = (1 << 63) - 1 + # Capture original IDs for mapping + node_id_map = {} + way_id_map = {} + rel_id_map = {} def _set_id_tag(osm_obj, new_id): tags = getattr(osm_obj, "tags", None) @@ -112,40 +260,43 @@ def _set_id_tag(osm_obj, new_id): else: tags["_id"] = value - def _normalise_id(osm_obj): - if osm_obj.id < 0: - new_id = osm_obj.id & mask_63bit - osm_obj.id = new_id - _set_id_tag(osm_obj, new_id) - return new_id - return osm_obj.id - - # Fix node IDs + # Remap node IDs sequentially starting at 1 for node in osmnodes: - _normalise_id(node) + old_id = getattr(node, "id", None) + if old_id is None: + continue + new_id = len(node_id_map) + 1 + node_id_map[old_id] = new_id + node.id = new_id + _set_id_tag(node, new_id) - # Fix ways and their node references + # Remap way IDs and rewrite node refs for way in osmways: - _normalise_id(way) + old_id = getattr(way, "id", None) + if old_id is not None: + new_id = len(way_id_map) + 1 + way_id_map[old_id] = new_id + way.id = new_id + _set_id_tag(way, new_id) - # Detect how node references are stored node_refs = getattr(way, "nds", None) or getattr(way, "refs", None) or getattr(way, "nodeRefs", None) or getattr(way, "nodes", None) - if node_refs is not None: new_refs = [] for ref in node_refs: - # Handle both int and OsmNode-like objects if isinstance(ref, int): - new_refs.append(ref & mask_63bit if ref < 0 else ref) + if ref not in node_id_map: + new_id = len(node_id_map) + 1 + node_id_map[ref] = new_id + new_refs.append(node_id_map.get(ref, ref)) elif hasattr(ref, "id"): - if ref.id < 0: - ref.id = ref.id & mask_63bit - _set_id_tag(ref, ref.id) + if ref.id not in node_id_map: + new_id = len(node_id_map) + 1 + node_id_map[ref.id] = new_id + ref.id = node_id_map.get(ref.id, ref.id) new_refs.append(ref) else: new_refs.append(ref) - # Write back using whichever attribute exists if hasattr(way, "nds"): way.nds = new_refs elif hasattr(way, "refs"): @@ -155,28 +306,35 @@ def _normalise_id(osm_obj): elif hasattr(way, "nodes"): way.nodes = new_refs - # Fix relation IDs and their member refs + # Remap relation IDs and member refs for rel in osmrelations: - if rel.id < 0: - rel.id = rel.id & mask_63bit - _normalise_id(rel) + old_id = getattr(rel, "id", None) + if old_id is not None: + new_id = len(rel_id_map) + 1 + rel_id_map[old_id] = new_id + rel.id = new_id + _set_id_tag(rel, new_id) if hasattr(rel, "members"): for member in rel.members: - if hasattr(member, "ref"): - ref = member.ref - if isinstance(ref, int) and ref < 0: - member.ref = ref & mask_63bit - elif hasattr(ref, "id") and ref.id < 0: - ref.id = ref.id & mask_63bit - _set_id_tag(ref, ref.id) - - # Ensure deterministic ordering now that IDs have been normalised + if not hasattr(member, "ref"): + continue + ref = member.ref + if isinstance(ref, int): + if ref not in node_id_map and ref not in way_id_map and ref not in rel_id_map: + new_id = len(rel_id_map) + 1 + rel_id_map[ref] = new_id + member.ref = node_id_map.get(ref, way_id_map.get(ref, rel_id_map.get(ref, ref))) + elif hasattr(ref, "id"): + if ref.id not in node_id_map and ref.id not in way_id_map and ref.id not in rel_id_map: + new_id = len(rel_id_map) + 1 + rel_id_map[ref.id] = new_id + ref.id = node_id_map.get(ref.id, way_id_map.get(ref.id, rel_id_map.get(ref.id, ref.id))) + + # Ensure deterministic ordering now that IDs have been remapped if hasattr(osmnodes, "sort"): osmnodes.sort(key=lambda n: n.id) if hasattr(osmways, "sort"): osmways.sort(key=lambda w: w.id) if hasattr(osmrelations, "sort"): osmrelations.sort(key=lambda r: r.id) - - diff --git a/src/osm_osw_reformatter/serializer/osw/osw_normalizer.py b/src/osm_osw_reformatter/serializer/osw/osw_normalizer.py index e070477..751b3bf 100644 --- a/src/osm_osw_reformatter/serializer/osw/osw_normalizer.py +++ b/src/osm_osw_reformatter/serializer/osw/osw_normalizer.py @@ -1,6 +1,29 @@ import types import math -OSW_SCHEMA_ID = "https://sidewalks.washington.edu/opensidewalks/0.2/schema.json" + +OSW_SCHEMA_ID = "https://sidewalks.washington.edu/opensidewalks/0.3/schema.json" + +LEAF_CYCLE_VALUES = ( + "deciduous", + "evergreen", + "mixed", + "semi_deciduous", + "semi_evergreen", +) + +LEAF_TYPE_VALUES = ( + "broadleaved", + "leafless", + "mixed", + "needleleaved", +) + + +def _tag_value(tags, key): + if key in tags: + return tags.get(key, "") + return tags.get(f"ext:{key}", "") + class OSWWayNormalizer: @@ -151,49 +174,49 @@ def _normalize_road(self, keep_keys = {}, defaults = {}): return new_tags def is_sidewalk(self): - return (self.tags.get("highway", "") == "footway") and ( - self.tags.get("footway", "") == "sidewalk" + return (_tag_value(self.tags, "highway") == "footway") and ( + _tag_value(self.tags, "footway") == "sidewalk" ) def is_crossing(self): - return (self.tags.get("highway", "") == "footway") and ( - self.tags.get("footway", "") == "crossing" + return (_tag_value(self.tags, "highway") == "footway") and ( + _tag_value(self.tags, "footway") == "crossing" ) def is_traffic_island(self): - return (self.tags.get("highway", "") == "footway") and ( - self.tags.get("footway", "") == "traffic_island" + return (_tag_value(self.tags, "highway") == "footway") and ( + _tag_value(self.tags, "footway") == "traffic_island" ) def is_footway(self): - return (self.tags.get("highway", "") == "footway") + return (_tag_value(self.tags, "highway") == "footway") def is_stairs(self): - return self.tags.get("highway", "") == "steps" + return _tag_value(self.tags, "highway") == "steps" def is_pedestrian(self): - return self.tags.get("highway", "") == "pedestrian" + return _tag_value(self.tags, "highway") == "pedestrian" def is_living_street(self): - return self.tags.get("highway", "") == "living_street" + return _tag_value(self.tags, "highway") == "living_street" def is_driveway(self): - return (self.tags.get("highway", "") == "service") and ( - self.tags.get("service", "") == "driveway" + return (_tag_value(self.tags, "highway") == "service") and ( + _tag_value(self.tags, "service") == "driveway" ) def is_alley(self): - return (self.tags.get("highway", "") == "service") and ( - self.tags.get("service", "") == "alley" + return (_tag_value(self.tags, "highway") == "service") and ( + _tag_value(self.tags, "service") == "alley" ) def is_parking_aisle(self): - return (self.tags.get("highway", "") == "service") and ( - self.tags.get("service", "") == "parking_aisle" + return (_tag_value(self.tags, "highway") == "service") and ( + _tag_value(self.tags, "service") == "parking_aisle" ) def is_road(self): - return self.tags.get("highway", "") in self.ROAD_HIGHWAY_VALUES + return _tag_value(self.tags, "highway") in self.ROAD_HIGHWAY_VALUES class OSWNodeNormalizer: KERB_VALUES = ("flush", "lowered", "rolled", "raised") @@ -229,8 +252,11 @@ def _normalize_kerb(self, keep_keys = {}, defaults = {}): return new_tags def is_kerb(self): - return (self.tags.get("kerb", "") in self.KERB_VALUES) or ( - self.tags.get("barrier", "") == "kerb" and ("kerb" not in self.tags or self.tags.get("kerb", "") == "yes") + kerb_value = _tag_value(self.tags, "kerb") + barrier_value = _tag_value(self.tags, "barrier") + has_kerb_key = "kerb" in self.tags or "ext:kerb" in self.tags + return (kerb_value in self.KERB_VALUES) or ( + barrier_value == "kerb" and (not has_kerb_key or kerb_value == "yes") ) class OSWPointNormalizer: @@ -244,7 +270,8 @@ def filter(self): self.is_waste_basket()) or ( self.is_manhole()) or ( self.is_bollard()) or ( - self.is_street_lamp()) + self.is_street_lamp()) or ( + self.is_tree()) @staticmethod def osw_point_filter(tags): @@ -263,6 +290,14 @@ def normalize(self): return self._normalize_point({"barrier": str}) elif self.is_street_lamp(): return self._normalize_point({"highway": str}) + elif self.is_tree(): + return self._normalize_point( + { + "natural": natural_point, + "leaf_cycle": leaf_cycle, + "leaf_type": leaf_type + } + ) else: print(f"Invalid point skipped. Tags: {self.tags}") return {} @@ -275,32 +310,35 @@ def _normalize_point(self, keep_keys={}, defaults = {}): return new_tags def is_powerpole(self): - return self.tags.get("power", "") == "pole" + return _tag_value(self.tags, "power") == "pole" def is_firehydrant(self): - return self.tags.get("emergency", "") == "fire_hydrant" + return _tag_value(self.tags, "emergency") == "fire_hydrant" def is_bench(self): - return self.tags.get("amenity", "") == "bench" + return _tag_value(self.tags, "amenity") == "bench" def is_waste_basket(self): - return self.tags.get("amenity", "") == "waste_basket" + return _tag_value(self.tags, "amenity") == "waste_basket" def is_manhole(self): - return self.tags.get("man_made", "") == "manhole" + return _tag_value(self.tags, "man_made") == "manhole" def is_bollard(self): - return self.tags.get("barrier", "") == "bollard" + return _tag_value(self.tags, "barrier") == "bollard" def is_street_lamp(self): - return self.tags.get("highway", "") == "street_lamp" + return _tag_value(self.tags, "highway") == "street_lamp" + + def is_tree(self): + return _tag_value(self.tags, "natural") == "tree" class OSWLineNormalizer: def __init__(self, tags): self.tags = tags def filter(self): - return (self.is_fence()) + return (self.is_fence()) or (self.is_tree_row()) @staticmethod def osw_line_filter(tags): @@ -309,6 +347,14 @@ def osw_line_filter(tags): def normalize(self): if self.is_fence(): return self._normalize_line({"barrier": str}) + elif self.is_tree_row(): + return self._normalize_line( + { + "natural": natural_line, + "leaf_cycle": leaf_cycle, + "leaf_type": leaf_type + } + ) else: raise ValueError("This is an invalid line") @@ -320,7 +366,10 @@ def _normalize_line(self, keep_keys={}, defaults = {}): return new_tags def is_fence(self): - return self.tags.get("barrier", "") == "fence" + return _tag_value(self.tags, "barrier") == "fence" + + def is_tree_row(self): + return _tag_value(self.tags, "natural") == "tree_row" class OSWPolygonNormalizer: # Will be fetched from schema soon @@ -430,7 +479,7 @@ def __init__(self, tags): self.tags = tags def filter(self): - return self.is_building() + return self.is_building() or self.is_wood() @staticmethod def osw_polygon_filter(tags): @@ -438,7 +487,25 @@ def osw_polygon_filter(tags): def normalize(self): if self.is_building(): - return self._normalize_polygon({"building": str, "name": str, "opening_hours": str}) + return self._normalize_polygon( + { + "building": str, + "name": str, + "opening_hours": str, + "leaf_cycle": leaf_cycle, + "leaf_type": leaf_type + } + ) + elif self.is_wood(): + return self._normalize_polygon( + { + "natural": natural_polygon, + "name": str, + "opening_hours": str, + "leaf_cycle": leaf_cycle, + "leaf_type": leaf_type + } + ) else: raise ValueError("This is an invalid polygon") @@ -450,7 +517,10 @@ def _normalize_polygon(self, keep_keys={}, defaults = {}): return new_tags def is_building(self): - return self.tags.get("building", "") in self.BUILDING_VALUES + return _tag_value(self.tags, "building") in self.BUILDING_VALUES + + def is_wood(self): + return _tag_value(self.tags, "natural") == "wood" class OSWZoneNormalizer: def __init__(self, tags): @@ -477,7 +547,7 @@ def _normalize_zone(self, keep_keys={}, defaults = {}): return new_tags def is_pedestrian(self): - return self.tags.get("highway", "") == "pedestrian" + return _tag_value(self.tags, "highway") == "pedestrian" def check_nan_and_raise(tag_type, temp): @@ -538,6 +608,36 @@ def _normalize(tags, keep_keys, defaults): return {**{**new_tags, **defaults}, **{**new_tags, **ext_tags}} +def leaf_cycle(tag_value, tags): + if tag_value.lower() not in LEAF_CYCLE_VALUES: + return None + return tag_value.lower() + + +def leaf_type(tag_value, tags): + if tag_value.lower() not in LEAF_TYPE_VALUES: + return None + return tag_value.lower() + + +def natural_point(tag_value, tags): + if tag_value.lower() != "tree": + return None + return "tree" + + +def natural_line(tag_value, tags): + if tag_value.lower() != "tree_row": + return None + return "tree_row" + + +def natural_polygon(tag_value, tags): + if tag_value.lower() != "wood": + return None + return "wood" + + def tactile_paving(tag_value, tags): if tag_value.lower() not in ( "yes", diff --git a/src/osm_osw_reformatter/version.py b/src/osm_osw_reformatter/version.py index a53c568..0404d81 100644 --- a/src/osm_osw_reformatter/version.py +++ b/src/osm_osw_reformatter/version.py @@ -1 +1 @@ -__version__ = '0.2.13' +__version__ = '0.3.0' diff --git a/tests/unit_tests/test_files/tree-test.xml b/tests/unit_tests/test_files/tree-test.xml new file mode 100644 index 0000000..09e8c21 --- /dev/null +++ b/tests/unit_tests/test_files/tree-test.xml @@ -0,0 +1,34 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/unit_tests/test_osm2osw/test_osm2osw.py b/tests/unit_tests/test_osm2osw/test_osm2osw.py index 143df9b..f478d12 100644 --- a/tests/unit_tests/test_osm2osw/test_osm2osw.py +++ b/tests/unit_tests/test_osm2osw/test_osm2osw.py @@ -5,6 +5,7 @@ import unittest import math from src.osm_osw_reformatter.osm2osw.osm2osw import OSM2OSW +from src.osm_osw_reformatter.serializer.osw.osw_normalizer import OSW_SCHEMA_ID ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) OUTPUT_DIR = os.path.join(os.path.dirname(os.path.dirname(ROOT_DIR)), 'output') @@ -12,6 +13,7 @@ TEST_WIDTH_FILE = os.path.join(ROOT_DIR, 'test_files/width-test.xml') TEST_INCLINE_FILE = os.path.join(ROOT_DIR, 'test_files/incline-test.xml') TEST_INVALID_NODE_TAGS_FILE = os.path.join(ROOT_DIR, 'test_files/node_with_invalid_tags.xml') +TEST_TREE_FILE = os.path.join(ROOT_DIR, 'test_files/tree-test.xml') def is_valid_float(value): @@ -220,6 +222,67 @@ async def run_test(): asyncio.run(run_test()) + def test_outputs_use_osw_03_schema(self): + osm_file_path = TEST_FILE + + async def run_test(): + osm2osw = OSM2OSW(osm_file=osm_file_path, workdir=OUTPUT_DIR, prefix='schema03') + result = await osm2osw.convert() + self.assertTrue(result.status) + + for file_path in result.generated_files: + if file_path.endswith('.geojson'): + with open(file_path) as f: + data = json.load(f) + self.assertEqual(data.get("$schema"), OSW_SCHEMA_ID) + os.remove(file_path) + + asyncio.run(run_test()) + + def test_tree_coverage_emitted_under_osw_03(self): + osm_file_path = TEST_TREE_FILE + + async def run_test(): + osm2osw = OSM2OSW(osm_file=osm_file_path, workdir=OUTPUT_DIR, prefix='tree') + result = await osm2osw.convert() + self.assertTrue(result.status) + + points = lines = polys = None + for file_path in result.generated_files: + if file_path.endswith('.graph.points.geojson'): + points = file_path + elif file_path.endswith('.graph.lines.geojson'): + lines = file_path + elif file_path.endswith('.graph.polygons.geojson'): + polys = file_path + + self.assertIsNotNone(points, "Points output missing") + self.assertIsNotNone(lines, "Lines output missing") + self.assertIsNotNone(polys, "Polygons output missing") + + with open(points) as f: + data = json.load(f) + self.assertEqual(data.get("$schema"), OSW_SCHEMA_ID) + naturals = [feat["properties"].get("natural") for feat in data.get("features", [])] + self.assertIn("tree", naturals) + + with open(lines) as f: + data = json.load(f) + self.assertEqual(data.get("$schema"), OSW_SCHEMA_ID) + naturals = [feat["properties"].get("natural") for feat in data.get("features", [])] + self.assertIn("tree_row", naturals) + + with open(polys) as f: + data = json.load(f) + self.assertEqual(data.get("$schema"), OSW_SCHEMA_ID) + naturals = [feat["properties"].get("natural") for feat in data.get("features", [])] + self.assertIn("wood", naturals) + + for file_path in result.generated_files: + os.remove(file_path) + + asyncio.run(run_test()) + def test_retains_incline_tag(self): osm_file_path = TEST_INCLINE_FILE diff --git a/tests/unit_tests/test_osw2osm/test_osw2osm.py b/tests/unit_tests/test_osw2osm/test_osw2osm.py index c46eb48..bb05a8b 100644 --- a/tests/unit_tests/test_osw2osm/test_osw2osm.py +++ b/tests/unit_tests/test_osw2osm/test_osw2osm.py @@ -1,4 +1,7 @@ +import json import os +from pathlib import Path +import tempfile import zipfile import unittest from src.osm_osw_reformatter.osw2osm.osw2osm import OSW2OSM @@ -32,6 +35,61 @@ def _create_invalid_incline_zip(zip_path: str) -> str: return zip_path +def _create_custom_property_zip(zip_path: str, properties: dict) -> str: + """Create a temporary OSW dataset with custom/non-compliant properties.""" + os.makedirs(os.path.dirname(zip_path), exist_ok=True) + geojson = { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "geometry": { + "type": "LineString", + "coordinates": [[0.0, 1.0], [0.0, 2.0]] + }, + "properties": { + "_id": "10", + "_u_id": "2", + "_v_id": "3", + "highway": "footway", + "foot": "yes", + **properties + } + } + ] + } + edges_path = Path(os.path.dirname(zip_path)) / "custom_edges.geojson" + with open(edges_path, "w") as fh: + json.dump(geojson, fh) + with zipfile.ZipFile(zip_path, 'w') as zf: + zf.write(edges_path, arcname='edges.geojson') + return zip_path + + +def _create_3d_node_zip(zip_path: str, z_value: float) -> str: + """Create an OSW dataset with a 3D node to test elevation extraction.""" + os.makedirs(os.path.dirname(zip_path), exist_ok=True) + nodes_geojson = { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "geometry": { + "type": "Point", + "coordinates": [-122.363431818197, 47.6778173599078, z_value] + }, + "properties": {"_id": "n1", "kerb": "flush", "barrier": "kerb"} + } + ] + } + nodes_path = Path(os.path.dirname(zip_path)) / "nodes_3d.geojson" + with open(nodes_path, "w") as fh: + json.dump(nodes_geojson, fh) + with zipfile.ZipFile(zip_path, 'w') as zf: + zf.write(nodes_path, arcname='nodes.geojson') + return zip_path + + class TestOSW2OSM(unittest.IsolatedAsyncioTestCase): def test_convert_successful(self): zip_file = TEST_ZIP_FILE @@ -144,6 +202,7 @@ def test_invalid_incline_values_are_excluded(self): tags = {tag.get('k'): tag.get('v') for tag in way.findall('tag')} if tags.get('_id') == '2': self.assertNotIn('incline', tags) + self.assertEqual(tags.get('ext:incline'), 'steep') if tags.get('_id') == '1': self.assertEqual(tags.get('incline'), '0.1') @@ -165,6 +224,148 @@ def test_osm_elements_have_version_attribute(self): os.remove(result.generated_files) + def test_visible_elements_gain_version_attribute(self): + os.makedirs(OUTPUT_DIR, exist_ok=True) + xml_path = os.path.join(OUTPUT_DIR, 'visible_version.xml') + xml_content = """ + + + + + + + +""" + with open(xml_path, 'w') as fh: + fh.write(xml_content) + + OSW2OSM._ensure_version_attribute(Path(xml_path)) + + tree = ET.parse(xml_path) + root = tree.getroot() + + visible_nodes = [n for n in root.findall('.//node') if n.get('visible') == 'true'] + visible_ways = [w for w in root.findall('.//way') if w.get('visible') == 'true'] + + self.assertTrue(visible_nodes, "Expected at least one visible node in test fixture") + self.assertTrue(visible_ways, "Expected at least one visible way in test fixture") + + for element in visible_nodes + visible_ways: + self.assertEqual(element.get('version'), '1') + + os.remove(xml_path) + + def test_non_compliant_properties_promoted_to_ext_tags(self): + zip_path = os.path.join(OUTPUT_DIR, 'dataset_with_custom_props.zip') + props = { + "incline": "steep", + "metadata": {"color": "blue"} + } + zip_file = _create_custom_property_zip(zip_path, props) + osw2osm = OSW2OSM(zip_file_path=zip_file, workdir=OUTPUT_DIR, prefix='custom') + result = osw2osm.convert() + + tree = ET.parse(result.generated_files) + root = tree.getroot() + + way = root.find('.//way') + tags = {tag.get('k'): tag.get('v') for tag in way.findall('tag')} + + self.assertNotIn('incline', tags) + self.assertEqual(tags.get('ext:incline'), 'steep') + self.assertEqual(tags.get('ext:metadata'), '{"color": "blue"}') + + os.remove(result.generated_files) + os.remove(zip_file) + custom_edges = Path(os.path.dirname(zip_path)) / "custom_edges.geojson" + if custom_edges.exists(): + os.remove(custom_edges) + + def test_3d_coordinates_promoted_to_ext_elevation(self): + zip_path = os.path.join(OUTPUT_DIR, 'dataset_with_3d_node.zip') + zip_file = _create_3d_node_zip(zip_path, 0.0) + osw2osm = OSW2OSM(zip_file_path=zip_file, workdir=OUTPUT_DIR, prefix='elevation') + result = osw2osm.convert() + + self.assertTrue(result.status, msg=getattr(result, 'error', 'Conversion failed')) + tree = ET.parse(result.generated_files) + root = tree.getroot() + + node = root.find('.//node') + tags = {tag.get('k'): tag.get('v') for tag in node.findall('tag')} + self.assertEqual(tags.get('ext:elevation'), '0.0') + + if result.generated_files: + os.remove(result.generated_files) + if zip_file and os.path.exists(zip_file): + os.remove(zip_file) + nodes_path = Path(os.path.dirname(zip_path)) / "nodes_3d.geojson" + if nodes_path.exists(): + os.remove(nodes_path) + + def test_ids_are_sequential_per_type(self): + zip_file = TEST_WIDTH_ZIP_FILE + osw2osm = OSW2OSM(zip_file_path=zip_file, workdir=OUTPUT_DIR, prefix='sequential') + result = osw2osm.convert() + + OSW2OSM._remap_ids_to_sequential(Path(result.generated_files)) + tree = ET.parse(result.generated_files) + root = tree.getroot() + + node_ids = sorted(int(n.get('id')) for n in root.findall('.//node')) + way_ids = sorted(int(w.get('id')) for w in root.findall('.//way')) + rel_ids = sorted(int(r.get('id')) for r in root.findall('.//relation')) + + self.assertEqual(node_ids, list(range(1, len(node_ids) + 1))) + self.assertEqual(way_ids, list(range(1, len(way_ids) + 1))) + if rel_ids: + self.assertEqual(rel_ids, list(range(1, len(rel_ids) + 1))) + + os.remove(result.generated_files) + + def test_remap_ids_rewrites_refs(self): + with tempfile.TemporaryDirectory() as tmpdir: + xml_path = Path(tmpdir, "sample.osm.xml") + xml_path.write_text( + """ + + + + + + + + + + + +""" + ) + + OSW2OSM._remap_ids_to_sequential(xml_path) + + root = ET.parse(xml_path).getroot() + node_ids = [n.get("id") for n in root.findall(".//node")] + way_ids = [w.get("id") for w in root.findall(".//way")] + rel_ids = [r.get("id") for r in root.findall(".//relation")] + + self.assertEqual(node_ids, ["1", "2"]) + self.assertEqual(way_ids, ["1"]) + self.assertEqual(rel_ids, ["1"]) + + nd_refs = [nd.get("ref") for nd in root.findall(".//way/nd")] + self.assertEqual(nd_refs, ["1", "2"]) + + member_refs = [(m.get("type"), m.get("ref")) for m in root.findall(".//relation/member")] + self.assertEqual(member_refs, [("node", "2"), ("way", "1")]) + + node_tag_ids = [tag.get("v") for tag in root.findall(".//node/tag[@k='_id']")] + self.assertEqual(node_tag_ids, ["1", "2"]) + way_tag_ids = [tag.get("v") for tag in root.findall(".//way/tag[@k='_id']")] + self.assertEqual(way_tag_ids, ["1"]) + rel_tag_ids = [tag.get("v") for tag in root.findall(".//relation/tag[@k='_id']")] + self.assertEqual(rel_tag_ids, ["1"]) + if __name__ == '__main__': unittest.main() diff --git a/tests/unit_tests/test_serializer/test_osm_graph.py b/tests/unit_tests/test_serializer/test_osm_graph.py index e5a8bbf..8d54635 100644 --- a/tests/unit_tests/test_serializer/test_osm_graph.py +++ b/tests/unit_tests/test_serializer/test_osm_graph.py @@ -4,7 +4,7 @@ from tempfile import TemporaryDirectory from unittest.mock import MagicMock, patch import networkx as nx -from shapely.geometry import LineString, Point, Polygon, mapping +from shapely.geometry import LineString, Point, Polygon, mapping, shape from src.osm_osw_reformatter.serializer.osm.osm_graph import ( OSMGraph, OSMWayParser, @@ -52,6 +52,10 @@ def filter_func(u, v, d): self.assertEqual( list(filtered_graph.get_graph().edges(data=True))[0][2]["property"], "A" ) + # Node data copied + self.mock_graph.add_node(1, foo="bar") + filtered_graph = self.osm_graph.filter_edges(filter_func) + self.assertEqual(filtered_graph.get_graph().nodes[1].get("foo"), "bar") def test_node_adds_tagged_node_with_coordinates(self): class DummyNode: @@ -189,6 +193,114 @@ def test_to_geojson(self, mock_mapping): if os.path.exists(path): os.remove(path) + def test_osm_line_parser_skips_invalid_locations(self): + G = nx.MultiDiGraph() + parser = OSMLineParser(G) + + class DummyLoc: + def __init__(self, lon, lat, valid=True): + self.lon = lon + self.lat = lat + self._valid = valid + + def valid(self): + return self._valid + + class DummyNode: + def __init__(self, lon, lat, ref, valid=True): + self.location = DummyLoc(lon, lat, valid) + self.ref = ref + self.lon = lon + self.lat = lat + + class DummyWay: + def __init__(self): + self.id = 1 + self.tags = {"barrier": "fence"} + self.nodes = [ + DummyNode(0, 0, 10, valid=False), + DummyNode(1, 1, 11, valid=True), + ] + + parser.way(DummyWay()) + self.assertIn("l1", G.nodes) + ndref = G.nodes["l1"]["ndref"] + self.assertEqual(len(ndref), 1) + self.assertEqual(ndref[0], [1.0, 1.0]) + + def test_osm_zone_parser_multiple_exteriors(self): + G = nx.MultiDiGraph() + parser = OSMZoneParser(G) + + class DummyRing(list): + def __init__(self, coords): + super().__init__([self._node(i, c) for i, c in enumerate(coords)]) + + @staticmethod + def _node(idx, coord): + class Node: + def __init__(self, idx, coord): + self.ref = idx + 1000 + self.lon = coord[0] + self.lat = coord[1] + + return Node(idx, coord) + + class DummyArea: + def __init__(self): + self.id = 5 + self.tags = {"highway": "pedestrian"} + self._outers = [ + DummyRing([(0, 0), (1, 0), (1, 1)]), + DummyRing([(2, 2), (3, 2), (3, 3)]), + ] + + def outer_rings(self): + return self._outers + + def inner_rings(self, exterior): + return [] + + parser.area(DummyArea()) + self.assertIn("z5", G.nodes) + self.assertIn("z51", G.nodes) + + def test_osm_polygon_parser_multiple_exteriors(self): + G = nx.MultiDiGraph() + parser = OSMPolygonParser(G) + + class DummyRing(list): + def __init__(self, coords): + super().__init__([self._node(c) for c in coords]) + + @staticmethod + def _node(coord): + class Node: + def __init__(self, coord): + self.lon = coord[0] + self.lat = coord[1] + + return Node(coord) + + class DummyArea: + def __init__(self): + self.id = 7 + self.tags = {"building": "yes"} + self._outers = [ + DummyRing([(0, 0), (1, 0), (1, 1)]), + DummyRing([(2, 2), (3, 2), (3, 3)]), + ] + + def outer_rings(self): + return self._outers + + def inner_rings(self, exterior): + return [] + + parser.area(DummyArea()) + self.assertIn("g7", G.nodes) + self.assertIn("g71", G.nodes) + def test_simplify(self): self.mock_graph.add_node(1) self.mock_graph.add_node(2) @@ -698,6 +810,61 @@ def test_to_geojson_point_ids_trim_prefix(self): self.assertEqual(props['_id'], '123') self.assertEqual(props['ext:osm_id'], '123') + def test_to_undirected_on_simple_graph(self): + g = nx.Graph() + g.add_edge(1, 2) + osm_graph = OSMGraph(G=g) + undirected = osm_graph.to_undirected() + self.assertIsInstance(undirected.get_graph(), nx.Graph) + self.assertFalse(undirected.get_graph().is_multigraph()) + + def test_filter_edges_preserves_node_attributes(self): + g = nx.MultiDiGraph() + g.add_node(1, keep=True, label="a") + g.add_node(2, keep=True, label="b") + g.add_edge(1, 2, keep=True) + osm_graph = OSMGraph(G=g) + filtered = osm_graph.filter_edges(lambda u, v, d: d.get("keep")) + self.assertIn(1, filtered.get_graph().nodes) + self.assertEqual(filtered.get_graph().nodes[1].get("label"), "a") + + def test_from_geojson_reads_features(self): + with TemporaryDirectory() as tmpdir: + nodes_path = os.path.join(tmpdir, "nodes.geojson") + edges_path = os.path.join(tmpdir, "edges.geojson") + + nodes_fc = { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "geometry": {"type": "Point", "coordinates": [0, 0]}, + "properties": {"_id": "1", "name": "node1"}, + } + ], + } + edges_fc = { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "geometry": {"type": "LineString", "coordinates": [[0, 0], [1, 1]]}, + "properties": {"_id": "e1", "_u_id": "1", "_v_id": "1", "highway": "footway"}, + } + ], + } + + with open(nodes_path, "w") as f: + json.dump(nodes_fc, f) + with open(edges_path, "w") as f: + json.dump(edges_fc, f) + + graph = OSMGraph.from_geojson(nodes_path, edges_path).get_graph() + self.assertIsInstance(graph, nx.MultiDiGraph) + self.assertIn("1", graph.nodes) + self.assertEqual(graph.nodes["1"]["name"], "node1") + self.assertEqual(len(list(graph.edges(data=True))), 1) + if __name__ == "__main__": diff --git a/tests/unit_tests/test_serializer/test_osm_osm_normalizer.py b/tests/unit_tests/test_serializer/test_osm_osm_normalizer.py new file mode 100644 index 0000000..b74dd1d --- /dev/null +++ b/tests/unit_tests/test_serializer/test_osm_osm_normalizer.py @@ -0,0 +1,210 @@ +import math +import unittest +import json + +import importlib.util +import sys +import types +from pathlib import Path + +# Stub ogr2osm before loading the module under test +sys.modules['ogr2osm'] = types.SimpleNamespace(TranslationBase=object) + +module_path = Path(__file__).resolve().parents[3] / 'src/osm_osw_reformatter/serializer/osm/osm_normalizer.py' +spec = importlib.util.spec_from_file_location('osm_normalizer', module_path) +osn = importlib.util.module_from_spec(spec) +spec.loader.exec_module(osn) + +OSMNormalizer = osn.OSMNormalizer + + +class DummyOsmGeometry: + def __init__(self, tags=None, osm_id=-1): + self.tags = tags or {} + self.id = osm_id + + +class DummyMember: + def __init__(self, ref): + self.ref = ref + + +class DummyRel: + def __init__(self, rel_id, members=None): + self.id = rel_id + self.members = members or [] + self.tags = {} + + +class DummyOgrGeom: + def __init__(self, point): + self.point = point + + def GetCoordinateDimension(self): + return len(self.point) + + def GetPointCount(self): + return 1 + + def GetPoint(self, idx): + return self.point + + def GetGeometryCount(self): + return 0 + + def GetGeometryRef(self, idx): + return None + + +class TestOSMNormalizer(unittest.TestCase): + def setUp(self): + self.normalizer = OSMNormalizer() + + def test_stash_ext_canonicalizes_json(self): + tags = {} + self.normalizer._stash_ext(tags, 'detail', '{ "name": "Bob" }') + self.assertEqual(json.loads(tags['ext:detail']), {"name": "Bob"}) + + tags2 = {} + self.normalizer._stash_ext(tags2, 'meta', {"foo": "bar"}) + self.assertEqual(json.loads(tags2['ext:meta']), {"foo": "bar"}) + + def test_stash_ext_handles_exceptions(self): + class Bad: + def __str__(self): + raise RuntimeError("boom") + tags = {} + with self.assertRaises(RuntimeError): + self.normalizer._stash_ext(tags, 'bad', Bad()) + + def test_stash_ext_ignores_none(self): + tags = {} + self.normalizer._stash_ext(tags, 'nothing', None) + self.assertNotIn('ext:nothing', tags) + + def test_stash_ext_invalid_json_string(self): + tags = {} + # Invalid JSON should fall through the final canonicalization except block + self.normalizer._stash_ext(tags, 'bad_json', '{oops}') + self.assertEqual(tags['ext:bad_json'], '{oops}') + + def test_filter_tags_moves_unknown_and_invalid_datatypes(self): + tags = { + 'highway': 'footway', + 'foot': 'yes', + 'climb': 'sideways', + 'incline': 'steep', + 'width': 'NaN', + 'mystery': 'value', + 'nested': {'a': 1}, + 'listval': [1, 2], + } + result = self.normalizer.filter_tags(tags) + self.assertNotIn('foot', result) + self.assertNotIn('climb', result) + self.assertNotIn('incline', result) + self.assertNotIn('width', result) + self.assertNotIn('mystery', result) + self.assertNotIn('nested', result) + self.assertNotIn('listval', result) + self.assertEqual(result.get('highway'), 'footway') + self.assertEqual(result.get('ext:climb'), 'sideways') + self.assertEqual(result.get('ext:incline'), 'steep') + self.assertEqual(result.get('ext:width'), 'NaN') + self.assertEqual(result.get('ext:mystery'), 'value') + self.assertEqual(json.loads(result.get('ext:nested')), {"a": 1}) + self.assertEqual(json.loads(result.get('ext:listval')), [1, 2]) + + def test_filter_tags_adds_area_for_zones(self): + tags = {'highway': 'pedestrian', '_w_id': '1'} + result = self.normalizer.filter_tags(tags) + self.assertEqual(result.get('area'), 'yes') + + def test_process_feature_post_sets_id_and_elevation(self): + osmgeom = DummyOsmGeometry(tags={'ext:osm_id': ['10']}, osm_id=-5) + ogrgeom = DummyOgrGeom((1.0, 2.0, 3.0)) + self.normalizer.process_feature_post(osmgeom, None, ogrgeom) + self.assertEqual(osmgeom.id, 10) + self.assertEqual(osmgeom.tags.get('ext:elevation'), ['3.0'] if isinstance(osmgeom.tags.get('ext:elevation'), list) else '3.0') + + def test_process_feature_post_set_tag_branches(self): + # tags contains list key -> replace with list + osmgeom_list = DummyOsmGeometry(tags={'ext:elevation': ['x'], '_id': ['2']}, osm_id=2) + self.normalizer.process_feature_post(osmgeom_list, None, DummyOgrGeom((0, 0, 2))) + self.assertEqual(osmgeom_list.tags.get('ext:elevation'), ['2.0']) + # tags contains scalar key -> overwrite scalar + osmgeom_scalar = DummyOsmGeometry(tags={'ext:elevation': 'old', '_id': ['3']}, osm_id=3) + self.normalizer.process_feature_post(osmgeom_scalar, None, DummyOgrGeom((0, 0, 3))) + self.assertEqual(osmgeom_scalar.tags.get('ext:elevation'), '3.0') + + def test_extract_elevation_rejects_nan(self): + self.assertIsNone(self.normalizer._extract_elevation(DummyOgrGeom((0, 0, float('nan'))))) + class BadGeom: + def GetCoordinateDimension(self): + raise RuntimeError("bad") + self.assertIsNone(self.normalizer._extract_elevation(BadGeom())) + class BadGeom2: + def GetCoordinateDimension(self): + return 3 + def GetPointCount(self): + raise RuntimeError("bad") + def GetGeometryCount(self): + raise RuntimeError("bad2") + self.assertIsNone(self.normalizer._extract_elevation(BadGeom2())) + # Not enough coordinates + class TwoDGeom: + def GetCoordinateDimension(self): + return 3 + def GetPointCount(self): + return 1 + def GetPoint(self, idx): + return (1, 2) + def GetGeometryCount(self): + return 0 + self.assertIsNone(self.normalizer._extract_elevation(TwoDGeom())) + # Non-numeric z + self.assertIsNone(self.normalizer._extract_elevation(DummyOgrGeom((0, 0, 'abc')))) + + def test_process_output_normalizes_negative_ids(self): + node = DummyOsmGeometry(tags={'_id': ['-1']}, osm_id=-1) + way = DummyOsmGeometry(tags={'_id': ['-2']}, osm_id=-2) + class RefObj: + def __init__(self, rid): + self.id = rid + way.refs = [RefObj(-1)] + rel_member = DummyMember(ref=RefObj(-3)) + rel = DummyRel(-4, members=[rel_member]) + self.normalizer.process_output([node], [way], [rel]) + + self.assertGreaterEqual(node.id, 0) + self.assertGreaterEqual(way.id, 0) + self.assertGreaterEqual(rel.id, 0) + self.assertGreaterEqual(rel.members[0].ref.id, 0) + refs = getattr(way, 'refs', []) + self.assertTrue(all(getattr(r, 'id', r) >= 0 for r in refs)) + # _id tag updated + tag_val = node.tags.get('_id') + if isinstance(tag_val, list): + self.assertIn(str(node.id), tag_val) + else: + self.assertEqual(tag_val, str(node.id)) + + def test_process_output_handles_various_refs_and_attrs(self): + way = DummyOsmGeometry(tags={'_id': ['-9']}, osm_id=-9) + way.nodeRefs = [-1, DummyOsmGeometry(tags={}, osm_id=-2), "raw"] + rel_member = DummyMember(ref=-5) + rel = DummyRel(-6, members=[rel_member]) + self.normalizer.process_output([], [way], [rel]) + refs = getattr(way, "nds", None) or getattr(way, "refs", None) or getattr(way, "nodeRefs", None) or getattr(way, "nodes", None) + for r in refs: + if isinstance(r, int): + self.assertGreaterEqual(r, 0) + elif hasattr(r, 'id'): + self.assertGreaterEqual(r.id, 0) + else: + self.assertIsInstance(r, str) + self.assertGreaterEqual(rel.members[0].ref, 0) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/unit_tests/test_serializer/test_osw_normalizer.py b/tests/unit_tests/test_serializer/test_osw_normalizer.py index 1188b47..d0702a7 100644 --- a/tests/unit_tests/test_serializer/test_osw_normalizer.py +++ b/tests/unit_tests/test_serializer/test_osw_normalizer.py @@ -10,11 +10,19 @@ OSWWayNormalizer = osw_normalizer.OSWWayNormalizer OSWNodeNormalizer = osw_normalizer.OSWNodeNormalizer OSWPointNormalizer = osw_normalizer.OSWPointNormalizer +OSWLineNormalizer = osw_normalizer.OSWLineNormalizer +OSWPolygonNormalizer = osw_normalizer.OSWPolygonNormalizer +OSWZoneNormalizer = osw_normalizer.OSWZoneNormalizer tactile_paving = osw_normalizer.tactile_paving surface = osw_normalizer.surface crossing_markings = osw_normalizer.crossing_markings climb = osw_normalizer.climb incline = osw_normalizer.incline +leaf_cycle = osw_normalizer.leaf_cycle +leaf_type = osw_normalizer.leaf_type +natural_point = osw_normalizer.natural_point +natural_line = osw_normalizer.natural_line +natural_polygon = osw_normalizer.natural_polygon _normalize = osw_normalizer._normalize @@ -24,6 +32,11 @@ def test_is_sidewalk(self): normalizer = OSWWayNormalizer(tags) self.assertTrue(normalizer.is_sidewalk()) + def test_is_sidewalk_with_ext_tags(self): + tags = {'ext:highway': 'footway', 'ext:footway': 'sidewalk'} + normalizer = OSWWayNormalizer(tags) + self.assertTrue(normalizer.is_sidewalk()) + def test_is_crossing(self): tags = {'highway': 'footway', 'footway': 'crossing'} normalizer = OSWWayNormalizer(tags) @@ -125,6 +138,12 @@ def test_normalize_invalid_way(self): with self.assertRaises(ValueError): normalizer.normalize() + def test_osw_way_filter_helper(self): + tags = {'highway': 'footway', 'footway': 'sidewalk'} + self.assertTrue(OSWWayNormalizer.osw_way_filter(tags)) + tags = {'highway': 'motorway'} + self.assertFalse(OSWWayNormalizer.osw_way_filter(tags)) + class TestOSWNodeNormalizer(unittest.TestCase): def test_is_kerb(self): @@ -132,6 +151,16 @@ def test_is_kerb(self): normalizer = OSWNodeNormalizer(tags) self.assertTrue(normalizer.is_kerb()) + def test_is_kerb_with_ext_tags(self): + tags = {'ext:kerb': 'flush'} + normalizer = OSWNodeNormalizer(tags) + self.assertTrue(normalizer.is_kerb()) + + def test_is_kerb_with_ext_barrier_only(self): + tags = {'ext:barrier': 'kerb'} + normalizer = OSWNodeNormalizer(tags) + self.assertTrue(normalizer.is_kerb()) + def test_is_kerb_invalid(self): tags = {'kerb': 'invalid_type'} normalizer = OSWNodeNormalizer(tags) @@ -169,6 +198,99 @@ def test_normalize_powerpole(self): expected = {'power': 'pole'} self.assertEqual(result, expected) + def test_normalize_fire_hydrant(self): + tags = {'emergency': 'fire_hydrant', 'name': 'Hydrant 42'} + normalizer = OSWPointNormalizer(tags) + result = normalizer.normalize() + expected = {'emergency': 'fire_hydrant'} + self.assertEqual(result, expected) + + def test_invalid_point_is_logged_and_skipped(self): + tags = {'amenity': 'unknown'} + normalizer = OSWPointNormalizer(tags) + result = normalizer.normalize() + self.assertEqual(result, {}) + + def test_is_tree(self): + tags = {'natural': 'tree'} + normalizer = OSWPointNormalizer(tags) + self.assertTrue(normalizer.is_tree()) + + def test_is_tree_with_ext_tags(self): + tags = {'ext:natural': 'tree'} + normalizer = OSWPointNormalizer(tags) + self.assertTrue(normalizer.is_tree()) + + def test_normalize_tree(self): + tags = {'natural': 'tree', 'leaf_cycle': 'Deciduous', 'leaf_type': 'needleLeaved'} + normalizer = OSWPointNormalizer(tags) + result = normalizer.normalize() + expected = {'natural': 'tree', 'leaf_cycle': 'deciduous', 'leaf_type': 'needleleaved'} + self.assertEqual(result, expected) + + +class TestOSWLineNormalizer(unittest.TestCase): + def test_is_tree_row(self): + tags = {'natural': 'tree_row'} + normalizer = OSWLineNormalizer(tags) + self.assertTrue(normalizer.is_tree_row()) + + def test_is_fence_with_ext_tags(self): + tags = {'ext:barrier': 'fence'} + normalizer = OSWLineNormalizer(tags) + self.assertTrue(normalizer.is_fence()) + + def test_normalize_tree_row(self): + tags = {'natural': 'tree_row', 'leaf_cycle': 'evergreen', 'leaf_type': 'leafless'} + normalizer = OSWLineNormalizer(tags) + result = normalizer.normalize() + expected = {'natural': 'tree_row', 'leaf_cycle': 'evergreen', 'leaf_type': 'leafless'} + self.assertEqual(result, expected) + + def test_invalid_line_raises(self): + tags = {'barrier': 'wall'} + normalizer = OSWLineNormalizer(tags) + with self.assertRaises(ValueError): + normalizer.normalize() + + +class TestOSWPolygonNormalizer(unittest.TestCase): + def test_is_wood(self): + tags = {'natural': 'wood'} + normalizer = OSWPolygonNormalizer(tags) + self.assertTrue(normalizer.is_wood()) + + def test_is_building_with_ext_tags(self): + tags = {'ext:building': 'yes'} + normalizer = OSWPolygonNormalizer(tags) + self.assertTrue(normalizer.is_building()) + + def test_normalize_wood(self): + tags = {'natural': 'wood', 'leaf_cycle': 'mixed', 'leaf_type': 'broadleaved'} + normalizer = OSWPolygonNormalizer(tags) + result = normalizer.normalize() + expected = {'natural': 'wood', 'leaf_cycle': 'mixed', 'leaf_type': 'broadleaved'} + self.assertEqual(result, expected) + + def test_invalid_polygon_raises(self): + tags = {'natural': 'meadow'} + normalizer = OSWPolygonNormalizer(tags) + with self.assertRaises(ValueError): + normalizer.normalize() + + +class TestOSWZoneNormalizer(unittest.TestCase): + def test_invalid_zone_raises(self): + tags = {'highway': 'residential'} + normalizer = OSWZoneNormalizer(tags) + with self.assertRaises(ValueError): + normalizer.normalize() + + def test_is_pedestrian_with_ext_tags(self): + tags = {'ext:highway': 'pedestrian'} + normalizer = OSWZoneNormalizer(tags) + self.assertTrue(normalizer.is_pedestrian()) + class TestCommonFunctions(unittest.TestCase): def test_tactile_paving(self): @@ -182,6 +304,9 @@ def test_surface(self): self.assertEqual(surface('concrete', {}), 'concrete') self.assertIsNone(surface('invalid_surface', {})) + def test_crossing_markings_from_zebra_crossing_tag(self): + self.assertEqual(crossing_markings('', {'crossing': 'zebra'}), 'zebra') + def test_crossing_markings(self): self.assertEqual(crossing_markings('dashes', {'crossing:markings': 'dashes'}), 'dashes') self.assertEqual(crossing_markings('dots', {'crossing:markings': 'dots'}), 'dots') @@ -197,6 +322,31 @@ def test_incline(self): self.assertEqual(incline('0.5', {}), 0.5) self.assertIsNone(incline('steep', {})) + def test_leaf_cycle(self): + self.assertEqual(leaf_cycle('Semi_deciduous', {}), 'semi_deciduous') + self.assertIsNone(leaf_cycle('unknown', {})) + + def test_leaf_type(self): + self.assertEqual(leaf_type('BroadLeaved', {}), 'broadleaved') + self.assertIsNone(leaf_type('fake', {})) + + def test_kerb(self): + self.assertEqual(osw_normalizer.kerb('flush', {}), 'flush') + self.assertIsNone(osw_normalizer.kerb('invalid', {})) + + def test_foot(self): + self.assertEqual(osw_normalizer.foot('designated', {}), 'designated') + self.assertIsNone(osw_normalizer.foot('spaceship', {})) + + def test_natural_point_invalid(self): + self.assertIsNone(natural_point('bush', {})) + + def test_natural_line_invalid(self): + self.assertIsNone(natural_line('hedge', {})) + + def test_natural_polygon_invalid(self): + self.assertIsNone(natural_polygon('meadow', {})) + class TestNormalizeWidthField(unittest.TestCase): def test_removes_width_when_value_is_nan_string(self): @@ -254,6 +404,12 @@ def test_preserves_width_when_value_is_float_with_string(self): self.assertIn('width', normalizer) self.assertEqual(normalizer['width'], 1.525) + def test_normalize_sets_literal_defaults(self): + tags = {"foo": "bar"} + keep_keys = {"foo": ["ext:foo_copy", "bar"]} + result = _normalize(tags, keep_keys, {}) + self.assertEqual(result.get("ext:foo_copy"), "bar") + if __name__ == '__main__':