Skip to content
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Change log

### 0.2.11
- Retain numeric `incline` values and new `length` tags during way normalization
- Recognize any `highway=steps` way as stairs, preserving valid `climb` tags
- Add compliance test verifying `incline` survives OSW→OSM→OSW roundtrips

### 0.2.10
- Removed `climb` tag generation from OSM normalizer

Expand Down
8 changes: 5 additions & 3 deletions src/osm_osw_reformatter/serializer/osm/osm_normalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ def filter_tags(self, tags):

if 'incline' in tags:
try:
incline_val = float(str(tags['incline']).rstrip('%'))
incline_val = float(str(tags['incline']))
except (ValueError, TypeError):
pass
# Drop the incline tag if it cannot be interpreted as a float
tags.pop('incline', '')
else:
# Normalise numeric incline values by casting to string
tags['incline'] = str(incline_val)

self._check_datatypes(tags)
Expand All @@ -75,4 +77,4 @@ def process_feature_post(self, osmgeometry, ogrfeature, ogrgeometry):
osm_id = int(osmgeometry.tags['_id'][0])

if osm_id is not None:
osmgeometry.id = osm_id
osmgeometry.id = osm_id
29 changes: 23 additions & 6 deletions src/osm_osw_reformatter/serializer/osw/osw_normalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ class OSWWayNormalizer:
"trunk_link"
)

CLIMB_VALUES = (
"up",
"down",
)

def __init__(self, tags):
self.tags = tags

Expand Down Expand Up @@ -67,7 +72,16 @@ def normalize(self):
raise ValueError("This is an invalid way")

def _normalize_way(self, keep_keys={}, defaults = {}):
generic_keep_keys = {"highway": str, "width": float, "surface": surface, "name": str, "description": str, "foot": foot}
generic_keep_keys = {
"highway": str,
"width": float,
"surface": surface,
"name": str,
"description": str,
"foot": foot,
"incline": incline,
"length": float,
}
generic_defaults = {}

new_tags = _normalize(self.tags, {**generic_keep_keys, **keep_keys}, {**generic_defaults, **defaults})
Expand All @@ -81,7 +95,7 @@ def _normalize_pedestrian(self, keep_keys = {}, defaults = {}):
return new_tags

def _normalize_stairs(self, keep_keys = {}, defaults = {}):
generic_keep_keys = {"step_count": int, "incline": ["climb", climb]}
generic_keep_keys = {"step_count": int, "climb": climb}
generic_defaults = {"foot": "yes"}

new_tags = self._normalize_way({**generic_keep_keys, **keep_keys}, {**generic_defaults, **defaults})
Expand Down Expand Up @@ -584,13 +598,16 @@ def crossing_markings(tag_value, tags):
return None

def climb(tag_value, tags):
if tag_value.lower() not in (
"up",
"down"
):
if tag_value.lower() not in OSWWayNormalizer.CLIMB_VALUES:
return None
else:
return tag_value.lower()

def incline(tag_value, tags):
try:
return float(str(tag_value))
except (ValueError, TypeError):
return None

def foot(tag_value, tags):
if tag_value.lower() not in (
Expand Down
2 changes: 1 addition & 1 deletion src/osm_osw_reformatter/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.2.10'
__version__ = '0.2.11'
12 changes: 12 additions & 0 deletions tests/unit_tests/test_files/edges_invalid_incline.geojson
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{"type": "FeatureCollection", "features": [
{
"type": "Feature",
"geometry": {"type": "LineString", "coordinates": [[0.0, 0.0], [0.0, 1.0]]},
"properties": {"_id": "1", "_u_id": "1", "_v_id": "2", "highway": "footway", "foot": "yes", "incline": 0.1}
},
{
"type": "Feature",
"geometry": {"type": "LineString", "coordinates": [[0.0, 1.0], [0.0, 2.0]]},
"properties": {"_id": "2", "_u_id": "2", "_v_id": "3", "highway": "footway", "foot": "yes", "incline": "steep"}
}
]}
12 changes: 12 additions & 0 deletions tests/unit_tests/test_files/incline-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<osm version="0.6" generator="test" upload="false">
<node id="1" lat="0.0" lon="0.0" />
<node id="2" lat="0.0" lon="0.1" />
<way id="1">
<nd ref="1"/>
<nd ref="2"/>
<tag k="highway" v="footway"/>
<tag k="incline" v="0.1"/>
<tag k="_id" v="1"/>
</way>
</osm>
5 changes: 5 additions & 0 deletions tests/unit_tests/test_files/nodes_invalid_incline.geojson
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"type": "FeatureCollection", "features": [
{"type": "Feature", "geometry": {"type": "Point", "coordinates": [0.0, 0.0]}, "properties": {"_id": "1"}},
{"type": "Feature", "geometry": {"type": "Point", "coordinates": [0.0, 1.0]}, "properties": {"_id": "2"}},
{"type": "Feature", "geometry": {"type": "Point", "coordinates": [0.0, 2.0]}, "properties": {"_id": "3"}}
]}
28 changes: 28 additions & 0 deletions tests/unit_tests/test_osm2osw/test_osm2osw.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
OUTPUT_DIR = os.path.join(os.path.dirname(os.path.dirname(ROOT_DIR)), 'output')
TEST_FILE = os.path.join(ROOT_DIR, 'test_files/wa.microsoft.osm.pbf')
TEST_WIDTH_FILE = os.path.join(ROOT_DIR, 'test_files/width-test.xml')
TEST_INCLINE_FILE = os.path.join(ROOT_DIR, 'test_files/incline-test.xml')


def is_valid_float(value):
Expand Down Expand Up @@ -218,6 +219,33 @@ async def run_test():

asyncio.run(run_test())

def test_retains_incline_tag(self):
osm_file_path = TEST_INCLINE_FILE

async def run_test():
osm2osw = OSM2OSW(osm_file=osm_file_path, workdir=OUTPUT_DIR, prefix='test')
result = await osm2osw.convert()
self.assertTrue(result.status)

found_incline = False
for file_path in result.generated_files:
if file_path.endswith('edges.geojson'):
with open(file_path) as f:
geojson = json.load(f)
for feature in geojson.get('features', []):
props = feature.get('properties', {})
if 'incline' in props:
self.assertIsInstance(props['incline'], (int, float))
found_incline = True
break

for file_path in result.generated_files:
os.remove(file_path)

self.assertTrue(found_incline, 'Incline tag not found in output edges')

asyncio.run(run_test())


if __name__ == '__main__':
unittest.main()
33 changes: 33 additions & 0 deletions tests/unit_tests/test_osm_compliance/test_osm_compliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,36 @@ async def test_output_is_osm_compliant(self):
os.remove(f)
os.remove(zip_path)
formatter.cleanup()

async def test_incline_tag_preserved(self):
osw2osm = OSW2OSM(
zip_file_path=TEST_DATA_WITH_INCLINE_ZIP_FILE,
workdir=OUTPUT_DIR,
prefix='incline'
)
result = osw2osm.convert()
osm_file = result.generated_files

formatter = Formatter(workdir=OUTPUT_DIR, file_path=osm_file, prefix='incline')
res = await formatter.osm2osw()
osw_files = res.generated_files

found_incline = False
for f in osw_files:
if f.endswith('.geojson'):
with open(f) as fh:
data = json.load(fh)
for feature in data.get('features', []):
props = feature.get('properties', {})
if 'incline' in props:
found_incline = True
break
if found_incline:
break

self.assertTrue(found_incline, 'No incline tag found in OSW output')

os.remove(osm_file)
for f in osw_files:
os.remove(f)
formatter.cleanup()
47 changes: 47 additions & 0 deletions tests/unit_tests/test_osw2osm/test_osw2osm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import zipfile
import unittest
from src.osm_osw_reformatter.osw2osm.osw2osm import OSW2OSM
import xml.etree.ElementTree as ET
Expand All @@ -8,6 +9,27 @@
TEST_ZIP_FILE = os.path.join(ROOT_DIR, 'test_files/osw.zip')
TEST_WIDTH_ZIP_FILE = os.path.join(ROOT_DIR, 'test_files/width-test.zip')
TEST_DATA_WITH_INCLINE_ZIP_FILE = os.path.join(ROOT_DIR, 'test_files/dataset_with_incline.zip')
TEST_EDGES_WITH_INVALID_INCLINE_FILE = os.path.join(ROOT_DIR, 'test_files/edges_invalid_incline.geojson')
TEST_NODES_WITH_INVALID_INCLINE_FILE = os.path.join(ROOT_DIR, 'test_files/nodes_invalid_incline.geojson')


def _create_invalid_incline_zip(zip_path: str) -> str:
"""Create a temporary OSW dataset with invalid incline values.

The resulting ZIP mirrors the structure expected by ``OSW2OSM`` and contains
both ``edges.geojson`` and ``nodes.geojson`` files.

Args:
zip_path: Location where the archive will be written.

Returns:
The path to the generated ZIP archive.
"""
os.makedirs(os.path.dirname(zip_path), exist_ok=True)
with zipfile.ZipFile(zip_path, 'w') as zf:
zf.write(TEST_EDGES_WITH_INVALID_INCLINE_FILE, arcname='edges.geojson')
zf.write(TEST_NODES_WITH_INVALID_INCLINE_FILE, arcname='nodes.geojson')
return zip_path


class TestOSW2OSM(unittest.IsolatedAsyncioTestCase):
Expand Down Expand Up @@ -105,6 +127,31 @@ def test_incline_tags_do_not_have_climb(self):

os.remove(result.generated_files)

def test_invalid_incline_values_are_excluded(self):
zip_path = os.path.join(OUTPUT_DIR, 'dataset_with_invalid_incline.zip')
zip_file = _create_invalid_incline_zip(zip_path)
osw2osm = OSW2OSM(zip_file_path=zip_file, workdir=OUTPUT_DIR, prefix='invalid')
result = osw2osm.convert()

# Ensure conversion succeeded so the XML file path is valid
self.assertTrue(result.status, msg=getattr(result, 'error', 'Conversion failed'))
xml_file_path = result.generated_files

tree = ET.parse(xml_file_path)
root = tree.getroot()

for way in root.findall('.//way'):
tags = {tag.get('k'): tag.get('v') for tag in way.findall('tag')}
if tags.get('_id') == '2':
self.assertNotIn('incline', tags)
if tags.get('_id') == '1':
self.assertEqual(tags.get('incline'), '0.1')

self.assertEqual(len(root.findall(".//tag[@k='incline']")), 1)

os.remove(result.generated_files)
os.remove(zip_file)


if __name__ == '__main__':
unittest.main()
4 changes: 2 additions & 2 deletions tests/unit_tests/test_serializer/test_osm_normalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ def test_removes_climb_without_incline(self):
self.assertNotIn("climb", normalizer)
self.assertNotIn("incline", normalizer)

def test_retains_non_numeric_incline_without_climb(self):
def test_discards_non_numeric_incline_without_climb(self):
tags = {"highway": "footway", "incline": "steep"}
normalizer = self.normalizer.filter_tags(tags)
self.assertEqual(normalizer["incline"], "steep")
self.assertNotIn("incline", normalizer)
self.assertNotIn("climb", normalizer)

def test_retains_climb_when_highway_is_steps(self):
Expand Down
66 changes: 63 additions & 3 deletions tests/unit_tests/test_serializer/test_osw_normalizer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
import unittest
from src.osm_osw_reformatter.serializer.osw.osw_normalizer import OSWWayNormalizer, OSWNodeNormalizer, \
OSWPointNormalizer, tactile_paving, surface, crossing_markings, climb, _normalize
import importlib.util
from pathlib import Path

module_path = Path(__file__).resolve().parents[3] / 'src/osm_osw_reformatter/serializer/osw/osw_normalizer.py'
spec = importlib.util.spec_from_file_location('osw_normalizer', module_path)
osw_normalizer = importlib.util.module_from_spec(spec)
spec.loader.exec_module(osw_normalizer)

OSWWayNormalizer = osw_normalizer.OSWWayNormalizer
OSWNodeNormalizer = osw_normalizer.OSWNodeNormalizer
OSWPointNormalizer = osw_normalizer.OSWPointNormalizer
tactile_paving = osw_normalizer.tactile_paving
surface = osw_normalizer.surface
crossing_markings = osw_normalizer.crossing_markings
climb = osw_normalizer.climb
incline = osw_normalizer.incline
_normalize = osw_normalizer._normalize


class TestOSWWayNormalizer(unittest.TestCase):
Expand Down Expand Up @@ -29,6 +44,11 @@ def test_is_stairs(self):
normalizer = OSWWayNormalizer(tags)
self.assertTrue(normalizer.is_stairs())

def test_is_stairs_with_invalid_climb(self):
tags = {'highway': 'steps', 'climb': 'left'}
normalizer = OSWWayNormalizer(tags)
self.assertTrue(normalizer.is_stairs())

def test_is_pedestrian(self):
tags = {'highway': 'pedestrian'}
normalizer = OSWWayNormalizer(tags)
Expand All @@ -53,6 +73,41 @@ def test_normalize_crossing(self):
expected = {'highway': 'footway', 'footway': 'crossing', 'foot': 'yes'}
self.assertEqual(result, expected)

def test_normalize_incline(self):
tags = {'highway': 'footway', 'incline': '10'}
normalizer = OSWWayNormalizer(tags)
result = normalizer.normalize()
expected = {'highway': 'footway', 'incline': 10.0, 'foot': 'yes'}
self.assertEqual(result, expected)

def test_normalize_length(self):
tags = {'highway': 'footway', 'length': '12'}
normalizer = OSWWayNormalizer(tags)
result = normalizer.normalize()
expected = {'highway': 'footway', 'length': 12.0, 'foot': 'yes'}
self.assertEqual(result, expected)

def test_normalize_stairs_keeps_climb(self):
tags = {'highway': 'steps', 'climb': 'down', 'step_count': '3'}
normalizer = OSWWayNormalizer(tags)
result = normalizer.normalize()
expected = {'highway': 'steps', 'climb': 'down', 'step_count': 3, 'foot': 'yes'}
self.assertEqual(result, expected)

def test_normalize_stairs_defaults_highway_and_no_foot(self):
tags = {'climb': 'up'}
normalizer = OSWWayNormalizer(tags)
result = normalizer._normalize_stairs()
expected = {'climb': 'up', 'foot': 'yes'}
self.assertEqual(result, expected)

def test_normalize_stairs_drops_invalid_climb(self):
tags = {'highway': 'steps', 'climb': 'left'}
normalizer = OSWWayNormalizer(tags)
result = normalizer.normalize()
expected = {'highway': 'steps', 'foot': 'yes'}
self.assertEqual(result, expected)

def test_normalize_invalid_way(self):
tags = {'highway': 'invalid_type'}
normalizer = OSWWayNormalizer(tags)
Expand Down Expand Up @@ -121,11 +176,16 @@ def test_crossing_markings(self):
self.assertEqual(crossing_markings('dots', {'crossing:markings': 'dots'}), 'dots')
self.assertIsNone(crossing_markings('invalid_value', {'crossing:markings': 'invalid_value'}))

def test_incline(self):
def test_climb(self):
self.assertEqual(climb('up', {}), 'up')
self.assertEqual(climb('down', {}), 'down')
self.assertIsNone(climb('invalid_value', {}))

def test_incline(self):
self.assertEqual(incline('10', {}), 10.0)
self.assertEqual(incline('0.5', {}), 0.5)
self.assertIsNone(incline('steep', {}))


class TestNormalizeWidthField(unittest.TestCase):
def test_removes_width_when_value_is_nan_string(self):
Expand Down