diff --git a/CHANGELOG.md b/CHANGELOG.md index c10255e..caa7eeb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Change log +### 0.2.11 +- Retain numeric `incline` values and new `length` tags during way normalization +- Recognize any `highway=steps` way as stairs, preserving valid `climb` tags +- Add compliance test verifying `incline` survives OSW→OSM→OSW roundtrips + ### 0.2.10 - Removed `climb` tag generation from OSM normalizer diff --git a/src/osm_osw_reformatter/serializer/osm/osm_normalizer.py b/src/osm_osw_reformatter/serializer/osm/osm_normalizer.py index 3a51033..be215af 100644 --- a/src/osm_osw_reformatter/serializer/osm/osm_normalizer.py +++ b/src/osm_osw_reformatter/serializer/osm/osm_normalizer.py @@ -51,10 +51,12 @@ def filter_tags(self, tags): if 'incline' in tags: try: - incline_val = float(str(tags['incline']).rstrip('%')) + incline_val = float(str(tags['incline'])) except (ValueError, TypeError): - pass + # Drop the incline tag if it cannot be interpreted as a float + tags.pop('incline', '') else: + # Normalise numeric incline values by casting to string tags['incline'] = str(incline_val) self._check_datatypes(tags) @@ -75,4 +77,4 @@ def process_feature_post(self, osmgeometry, ogrfeature, ogrgeometry): osm_id = int(osmgeometry.tags['_id'][0]) if osm_id is not None: - osmgeometry.id = osm_id \ No newline at end of file + osmgeometry.id = osm_id diff --git a/src/osm_osw_reformatter/serializer/osw/osw_normalizer.py b/src/osm_osw_reformatter/serializer/osw/osw_normalizer.py index 5d7255e..eec828b 100644 --- a/src/osm_osw_reformatter/serializer/osw/osw_normalizer.py +++ b/src/osm_osw_reformatter/serializer/osw/osw_normalizer.py @@ -18,6 +18,11 @@ class OSWWayNormalizer: "trunk_link" ) + CLIMB_VALUES = ( + "up", + "down", + ) + def __init__(self, tags): self.tags = tags @@ -67,7 +72,16 @@ def normalize(self): raise ValueError("This is an invalid way") def _normalize_way(self, keep_keys={}, defaults = {}): - generic_keep_keys = {"highway": str, "width": float, "surface": surface, "name": str, "description": str, "foot": foot} + generic_keep_keys = { + "highway": str, + "width": float, + "surface": surface, + "name": str, + "description": str, + "foot": foot, + "incline": incline, + "length": float, + } generic_defaults = {} new_tags = _normalize(self.tags, {**generic_keep_keys, **keep_keys}, {**generic_defaults, **defaults}) @@ -81,7 +95,7 @@ def _normalize_pedestrian(self, keep_keys = {}, defaults = {}): return new_tags def _normalize_stairs(self, keep_keys = {}, defaults = {}): - generic_keep_keys = {"step_count": int, "incline": ["climb", climb]} + generic_keep_keys = {"step_count": int, "climb": climb} generic_defaults = {"foot": "yes"} new_tags = self._normalize_way({**generic_keep_keys, **keep_keys}, {**generic_defaults, **defaults}) @@ -584,13 +598,16 @@ def crossing_markings(tag_value, tags): return None def climb(tag_value, tags): - if tag_value.lower() not in ( - "up", - "down" - ): + if tag_value.lower() not in OSWWayNormalizer.CLIMB_VALUES: return None else: return tag_value.lower() + +def incline(tag_value, tags): + try: + return float(str(tag_value)) + except (ValueError, TypeError): + return None def foot(tag_value, tags): if tag_value.lower() not in ( diff --git a/src/osm_osw_reformatter/version.py b/src/osm_osw_reformatter/version.py index e913eb4..c5b683c 100644 --- a/src/osm_osw_reformatter/version.py +++ b/src/osm_osw_reformatter/version.py @@ -1 +1 @@ -__version__ = '0.2.10' +__version__ = '0.2.11' diff --git a/tests/unit_tests/test_files/edges_invalid_incline.geojson b/tests/unit_tests/test_files/edges_invalid_incline.geojson new file mode 100644 index 0000000..c465428 --- /dev/null +++ b/tests/unit_tests/test_files/edges_invalid_incline.geojson @@ -0,0 +1,12 @@ +{"type": "FeatureCollection", "features": [ + { + "type": "Feature", + "geometry": {"type": "LineString", "coordinates": [[0.0, 0.0], [0.0, 1.0]]}, + "properties": {"_id": "1", "_u_id": "1", "_v_id": "2", "highway": "footway", "foot": "yes", "incline": 0.1} + }, + { + "type": "Feature", + "geometry": {"type": "LineString", "coordinates": [[0.0, 1.0], [0.0, 2.0]]}, + "properties": {"_id": "2", "_u_id": "2", "_v_id": "3", "highway": "footway", "foot": "yes", "incline": "steep"} + } +]} diff --git a/tests/unit_tests/test_files/incline-test.xml b/tests/unit_tests/test_files/incline-test.xml new file mode 100644 index 0000000..bfb25d0 --- /dev/null +++ b/tests/unit_tests/test_files/incline-test.xml @@ -0,0 +1,12 @@ + + + + + + + + + + + + diff --git a/tests/unit_tests/test_files/nodes_invalid_incline.geojson b/tests/unit_tests/test_files/nodes_invalid_incline.geojson new file mode 100644 index 0000000..a816d42 --- /dev/null +++ b/tests/unit_tests/test_files/nodes_invalid_incline.geojson @@ -0,0 +1,5 @@ +{"type": "FeatureCollection", "features": [ + {"type": "Feature", "geometry": {"type": "Point", "coordinates": [0.0, 0.0]}, "properties": {"_id": "1"}}, + {"type": "Feature", "geometry": {"type": "Point", "coordinates": [0.0, 1.0]}, "properties": {"_id": "2"}}, + {"type": "Feature", "geometry": {"type": "Point", "coordinates": [0.0, 2.0]}, "properties": {"_id": "3"}} +]} diff --git a/tests/unit_tests/test_osm2osw/test_osm2osw.py b/tests/unit_tests/test_osm2osw/test_osm2osw.py index e0402df..00b8cfe 100644 --- a/tests/unit_tests/test_osm2osw/test_osm2osw.py +++ b/tests/unit_tests/test_osm2osw/test_osm2osw.py @@ -10,6 +10,7 @@ OUTPUT_DIR = os.path.join(os.path.dirname(os.path.dirname(ROOT_DIR)), 'output') TEST_FILE = os.path.join(ROOT_DIR, 'test_files/wa.microsoft.osm.pbf') TEST_WIDTH_FILE = os.path.join(ROOT_DIR, 'test_files/width-test.xml') +TEST_INCLINE_FILE = os.path.join(ROOT_DIR, 'test_files/incline-test.xml') def is_valid_float(value): @@ -218,6 +219,33 @@ async def run_test(): asyncio.run(run_test()) + def test_retains_incline_tag(self): + osm_file_path = TEST_INCLINE_FILE + + async def run_test(): + osm2osw = OSM2OSW(osm_file=osm_file_path, workdir=OUTPUT_DIR, prefix='test') + result = await osm2osw.convert() + self.assertTrue(result.status) + + found_incline = False + for file_path in result.generated_files: + if file_path.endswith('edges.geojson'): + with open(file_path) as f: + geojson = json.load(f) + for feature in geojson.get('features', []): + props = feature.get('properties', {}) + if 'incline' in props: + self.assertIsInstance(props['incline'], (int, float)) + found_incline = True + break + + for file_path in result.generated_files: + os.remove(file_path) + + self.assertTrue(found_incline, 'Incline tag not found in output edges') + + asyncio.run(run_test()) + if __name__ == '__main__': unittest.main() diff --git a/tests/unit_tests/test_osm_compliance/test_osm_compliance.py b/tests/unit_tests/test_osm_compliance/test_osm_compliance.py index 8a90275..c21f1b7 100644 --- a/tests/unit_tests/test_osm_compliance/test_osm_compliance.py +++ b/tests/unit_tests/test_osm_compliance/test_osm_compliance.py @@ -36,3 +36,36 @@ async def test_output_is_osm_compliant(self): os.remove(f) os.remove(zip_path) formatter.cleanup() + + async def test_incline_tag_preserved(self): + osw2osm = OSW2OSM( + zip_file_path=TEST_DATA_WITH_INCLINE_ZIP_FILE, + workdir=OUTPUT_DIR, + prefix='incline' + ) + result = osw2osm.convert() + osm_file = result.generated_files + + formatter = Formatter(workdir=OUTPUT_DIR, file_path=osm_file, prefix='incline') + res = await formatter.osm2osw() + osw_files = res.generated_files + + found_incline = False + for f in osw_files: + if f.endswith('.geojson'): + with open(f) as fh: + data = json.load(fh) + for feature in data.get('features', []): + props = feature.get('properties', {}) + if 'incline' in props: + found_incline = True + break + if found_incline: + break + + self.assertTrue(found_incline, 'No incline tag found in OSW output') + + os.remove(osm_file) + for f in osw_files: + os.remove(f) + formatter.cleanup() diff --git a/tests/unit_tests/test_osw2osm/test_osw2osm.py b/tests/unit_tests/test_osw2osm/test_osw2osm.py index d862fcd..f877fec 100644 --- a/tests/unit_tests/test_osw2osm/test_osw2osm.py +++ b/tests/unit_tests/test_osw2osm/test_osw2osm.py @@ -1,4 +1,5 @@ import os +import zipfile import unittest from src.osm_osw_reformatter.osw2osm.osw2osm import OSW2OSM import xml.etree.ElementTree as ET @@ -8,6 +9,27 @@ TEST_ZIP_FILE = os.path.join(ROOT_DIR, 'test_files/osw.zip') TEST_WIDTH_ZIP_FILE = os.path.join(ROOT_DIR, 'test_files/width-test.zip') TEST_DATA_WITH_INCLINE_ZIP_FILE = os.path.join(ROOT_DIR, 'test_files/dataset_with_incline.zip') +TEST_EDGES_WITH_INVALID_INCLINE_FILE = os.path.join(ROOT_DIR, 'test_files/edges_invalid_incline.geojson') +TEST_NODES_WITH_INVALID_INCLINE_FILE = os.path.join(ROOT_DIR, 'test_files/nodes_invalid_incline.geojson') + + +def _create_invalid_incline_zip(zip_path: str) -> str: + """Create a temporary OSW dataset with invalid incline values. + + The resulting ZIP mirrors the structure expected by ``OSW2OSM`` and contains + both ``edges.geojson`` and ``nodes.geojson`` files. + + Args: + zip_path: Location where the archive will be written. + + Returns: + The path to the generated ZIP archive. + """ + os.makedirs(os.path.dirname(zip_path), exist_ok=True) + with zipfile.ZipFile(zip_path, 'w') as zf: + zf.write(TEST_EDGES_WITH_INVALID_INCLINE_FILE, arcname='edges.geojson') + zf.write(TEST_NODES_WITH_INVALID_INCLINE_FILE, arcname='nodes.geojson') + return zip_path class TestOSW2OSM(unittest.IsolatedAsyncioTestCase): @@ -105,6 +127,31 @@ def test_incline_tags_do_not_have_climb(self): os.remove(result.generated_files) + def test_invalid_incline_values_are_excluded(self): + zip_path = os.path.join(OUTPUT_DIR, 'dataset_with_invalid_incline.zip') + zip_file = _create_invalid_incline_zip(zip_path) + osw2osm = OSW2OSM(zip_file_path=zip_file, workdir=OUTPUT_DIR, prefix='invalid') + result = osw2osm.convert() + + # Ensure conversion succeeded so the XML file path is valid + self.assertTrue(result.status, msg=getattr(result, 'error', 'Conversion failed')) + xml_file_path = result.generated_files + + tree = ET.parse(xml_file_path) + root = tree.getroot() + + for way in root.findall('.//way'): + tags = {tag.get('k'): tag.get('v') for tag in way.findall('tag')} + if tags.get('_id') == '2': + self.assertNotIn('incline', tags) + if tags.get('_id') == '1': + self.assertEqual(tags.get('incline'), '0.1') + + self.assertEqual(len(root.findall(".//tag[@k='incline']")), 1) + + os.remove(result.generated_files) + os.remove(zip_file) + if __name__ == '__main__': unittest.main() diff --git a/tests/unit_tests/test_serializer/test_osm_normalizer.py b/tests/unit_tests/test_serializer/test_osm_normalizer.py index ec0a95e..b26076b 100644 --- a/tests/unit_tests/test_serializer/test_osm_normalizer.py +++ b/tests/unit_tests/test_serializer/test_osm_normalizer.py @@ -85,10 +85,10 @@ def test_removes_climb_without_incline(self): self.assertNotIn("climb", normalizer) self.assertNotIn("incline", normalizer) - def test_retains_non_numeric_incline_without_climb(self): + def test_discards_non_numeric_incline_without_climb(self): tags = {"highway": "footway", "incline": "steep"} normalizer = self.normalizer.filter_tags(tags) - self.assertEqual(normalizer["incline"], "steep") + self.assertNotIn("incline", normalizer) self.assertNotIn("climb", normalizer) def test_retains_climb_when_highway_is_steps(self): diff --git a/tests/unit_tests/test_serializer/test_osw_normalizer.py b/tests/unit_tests/test_serializer/test_osw_normalizer.py index fdd57ac..6dbf570 100644 --- a/tests/unit_tests/test_serializer/test_osw_normalizer.py +++ b/tests/unit_tests/test_serializer/test_osw_normalizer.py @@ -1,6 +1,21 @@ import unittest -from src.osm_osw_reformatter.serializer.osw.osw_normalizer import OSWWayNormalizer, OSWNodeNormalizer, \ - OSWPointNormalizer, tactile_paving, surface, crossing_markings, climb, _normalize +import importlib.util +from pathlib import Path + +module_path = Path(__file__).resolve().parents[3] / 'src/osm_osw_reformatter/serializer/osw/osw_normalizer.py' +spec = importlib.util.spec_from_file_location('osw_normalizer', module_path) +osw_normalizer = importlib.util.module_from_spec(spec) +spec.loader.exec_module(osw_normalizer) + +OSWWayNormalizer = osw_normalizer.OSWWayNormalizer +OSWNodeNormalizer = osw_normalizer.OSWNodeNormalizer +OSWPointNormalizer = osw_normalizer.OSWPointNormalizer +tactile_paving = osw_normalizer.tactile_paving +surface = osw_normalizer.surface +crossing_markings = osw_normalizer.crossing_markings +climb = osw_normalizer.climb +incline = osw_normalizer.incline +_normalize = osw_normalizer._normalize class TestOSWWayNormalizer(unittest.TestCase): @@ -29,6 +44,11 @@ def test_is_stairs(self): normalizer = OSWWayNormalizer(tags) self.assertTrue(normalizer.is_stairs()) + def test_is_stairs_with_invalid_climb(self): + tags = {'highway': 'steps', 'climb': 'left'} + normalizer = OSWWayNormalizer(tags) + self.assertTrue(normalizer.is_stairs()) + def test_is_pedestrian(self): tags = {'highway': 'pedestrian'} normalizer = OSWWayNormalizer(tags) @@ -53,6 +73,41 @@ def test_normalize_crossing(self): expected = {'highway': 'footway', 'footway': 'crossing', 'foot': 'yes'} self.assertEqual(result, expected) + def test_normalize_incline(self): + tags = {'highway': 'footway', 'incline': '10'} + normalizer = OSWWayNormalizer(tags) + result = normalizer.normalize() + expected = {'highway': 'footway', 'incline': 10.0, 'foot': 'yes'} + self.assertEqual(result, expected) + + def test_normalize_length(self): + tags = {'highway': 'footway', 'length': '12'} + normalizer = OSWWayNormalizer(tags) + result = normalizer.normalize() + expected = {'highway': 'footway', 'length': 12.0, 'foot': 'yes'} + self.assertEqual(result, expected) + + def test_normalize_stairs_keeps_climb(self): + tags = {'highway': 'steps', 'climb': 'down', 'step_count': '3'} + normalizer = OSWWayNormalizer(tags) + result = normalizer.normalize() + expected = {'highway': 'steps', 'climb': 'down', 'step_count': 3, 'foot': 'yes'} + self.assertEqual(result, expected) + + def test_normalize_stairs_defaults_highway_and_no_foot(self): + tags = {'climb': 'up'} + normalizer = OSWWayNormalizer(tags) + result = normalizer._normalize_stairs() + expected = {'climb': 'up', 'foot': 'yes'} + self.assertEqual(result, expected) + + def test_normalize_stairs_drops_invalid_climb(self): + tags = {'highway': 'steps', 'climb': 'left'} + normalizer = OSWWayNormalizer(tags) + result = normalizer.normalize() + expected = {'highway': 'steps', 'foot': 'yes'} + self.assertEqual(result, expected) + def test_normalize_invalid_way(self): tags = {'highway': 'invalid_type'} normalizer = OSWWayNormalizer(tags) @@ -121,11 +176,16 @@ def test_crossing_markings(self): self.assertEqual(crossing_markings('dots', {'crossing:markings': 'dots'}), 'dots') self.assertIsNone(crossing_markings('invalid_value', {'crossing:markings': 'invalid_value'})) - def test_incline(self): + def test_climb(self): self.assertEqual(climb('up', {}), 'up') self.assertEqual(climb('down', {}), 'down') self.assertIsNone(climb('invalid_value', {})) + def test_incline(self): + self.assertEqual(incline('10', {}), 10.0) + self.assertEqual(incline('0.5', {}), 0.5) + self.assertIsNone(incline('steep', {})) + class TestNormalizeWidthField(unittest.TestCase): def test_removes_width_when_value_is_nan_string(self):