Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Change log

### 0.2.11
- Retain numeric `incline` values and new `length` tags during way normalization
- Recognize any `highway=steps` way as stairs, preserving valid `climb` tags
- Add compliance test verifying `incline` survives OSW→OSM→OSW roundtrips

### 0.2.10
- Removed `climb` tag generation from OSM normalizer

Expand Down
8 changes: 5 additions & 3 deletions src/osm_osw_reformatter/serializer/osm/osm_normalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ def filter_tags(self, tags):

if 'incline' in tags:
try:
incline_val = float(str(tags['incline']).rstrip('%'))
incline_val = float(str(tags['incline']))
except (ValueError, TypeError):
pass
# Drop the incline tag if it cannot be interpreted as a float
tags.pop('incline', '')
else:
# Normalise numeric incline values by casting to string
tags['incline'] = str(incline_val)

self._check_datatypes(tags)
Expand All @@ -75,4 +77,4 @@ def process_feature_post(self, osmgeometry, ogrfeature, ogrgeometry):
osm_id = int(osmgeometry.tags['_id'][0])

if osm_id is not None:
osmgeometry.id = osm_id
osmgeometry.id = osm_id
29 changes: 23 additions & 6 deletions src/osm_osw_reformatter/serializer/osw/osw_normalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ class OSWWayNormalizer:
"trunk_link"
)

CLIMB_VALUES = (
"up",
"down",
)

def __init__(self, tags):
self.tags = tags

Expand Down Expand Up @@ -67,7 +72,16 @@ def normalize(self):
raise ValueError("This is an invalid way")

def _normalize_way(self, keep_keys={}, defaults = {}):
generic_keep_keys = {"highway": str, "width": float, "surface": surface, "name": str, "description": str, "foot": foot}
generic_keep_keys = {
"highway": str,
"width": float,
"surface": surface,
"name": str,
"description": str,
"foot": foot,
"incline": incline,
"length": float,
}
generic_defaults = {}

new_tags = _normalize(self.tags, {**generic_keep_keys, **keep_keys}, {**generic_defaults, **defaults})
Expand All @@ -81,7 +95,7 @@ def _normalize_pedestrian(self, keep_keys = {}, defaults = {}):
return new_tags

def _normalize_stairs(self, keep_keys = {}, defaults = {}):
generic_keep_keys = {"step_count": int, "incline": ["climb", climb]}
generic_keep_keys = {"step_count": int, "climb": climb}
generic_defaults = {"foot": "yes"}

new_tags = self._normalize_way({**generic_keep_keys, **keep_keys}, {**generic_defaults, **defaults})
Expand Down Expand Up @@ -584,13 +598,16 @@ def crossing_markings(tag_value, tags):
return None

def climb(tag_value, tags):
if tag_value.lower() not in (
"up",
"down"
):
if tag_value.lower() not in OSWWayNormalizer.CLIMB_VALUES:
return None
else:
return tag_value.lower()

def incline(tag_value, tags):
try:
return float(str(tag_value))
except (ValueError, TypeError):
return None

def foot(tag_value, tags):
if tag_value.lower() not in (
Expand Down
2 changes: 1 addition & 1 deletion src/osm_osw_reformatter/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.2.10'
__version__ = '0.2.11'
12 changes: 12 additions & 0 deletions tests/unit_tests/test_files/edges_invalid_incline.geojson
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{"type": "FeatureCollection", "features": [
{
"type": "Feature",
"geometry": {"type": "LineString", "coordinates": [[0.0, 0.0], [0.0, 1.0]]},
"properties": {"_id": "1", "_u_id": "1", "_v_id": "2", "highway": "footway", "foot": "yes", "incline": 0.1}
},
{
"type": "Feature",
"geometry": {"type": "LineString", "coordinates": [[0.0, 1.0], [0.0, 2.0]]},
"properties": {"_id": "2", "_u_id": "2", "_v_id": "3", "highway": "footway", "foot": "yes", "incline": "steep"}
}
]}
12 changes: 12 additions & 0 deletions tests/unit_tests/test_files/incline-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<osm version="0.6" generator="test" upload="false">
<node id="1" lat="0.0" lon="0.0" />
<node id="2" lat="0.0" lon="0.1" />
<way id="1">
<nd ref="1"/>
<nd ref="2"/>
<tag k="highway" v="footway"/>
<tag k="incline" v="0.1"/>
<tag k="_id" v="1"/>
</way>
</osm>
5 changes: 5 additions & 0 deletions tests/unit_tests/test_files/nodes_invalid_incline.geojson
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"type": "FeatureCollection", "features": [
{"type": "Feature", "geometry": {"type": "Point", "coordinates": [0.0, 0.0]}, "properties": {"_id": "1"}},
{"type": "Feature", "geometry": {"type": "Point", "coordinates": [0.0, 1.0]}, "properties": {"_id": "2"}},
{"type": "Feature", "geometry": {"type": "Point", "coordinates": [0.0, 2.0]}, "properties": {"_id": "3"}}
]}
28 changes: 28 additions & 0 deletions tests/unit_tests/test_osm2osw/test_osm2osw.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
OUTPUT_DIR = os.path.join(os.path.dirname(os.path.dirname(ROOT_DIR)), 'output')
TEST_FILE = os.path.join(ROOT_DIR, 'test_files/wa.microsoft.osm.pbf')
TEST_WIDTH_FILE = os.path.join(ROOT_DIR, 'test_files/width-test.xml')
TEST_INCLINE_FILE = os.path.join(ROOT_DIR, 'test_files/incline-test.xml')


def is_valid_float(value):
Expand Down Expand Up @@ -218,6 +219,33 @@ async def run_test():

asyncio.run(run_test())

def test_retains_incline_tag(self):
osm_file_path = TEST_INCLINE_FILE

async def run_test():
osm2osw = OSM2OSW(osm_file=osm_file_path, workdir=OUTPUT_DIR, prefix='test')
result = await osm2osw.convert()
self.assertTrue(result.status)

found_incline = False
for file_path in result.generated_files:
if file_path.endswith('edges.geojson'):
with open(file_path) as f:
geojson = json.load(f)
for feature in geojson.get('features', []):
props = feature.get('properties', {})
if 'incline' in props:
self.assertIsInstance(props['incline'], (int, float))
found_incline = True
break

for file_path in result.generated_files:
os.remove(file_path)

self.assertTrue(found_incline, 'Incline tag not found in output edges')

asyncio.run(run_test())


if __name__ == '__main__':
unittest.main()
33 changes: 33 additions & 0 deletions tests/unit_tests/test_osm_compliance/test_osm_compliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,36 @@ async def test_output_is_osm_compliant(self):
os.remove(f)
os.remove(zip_path)
formatter.cleanup()

async def test_incline_tag_preserved(self):
osw2osm = OSW2OSM(
zip_file_path=TEST_DATA_WITH_INCLINE_ZIP_FILE,
workdir=OUTPUT_DIR,
prefix='incline'
)
result = osw2osm.convert()
osm_file = result.generated_files

formatter = Formatter(workdir=OUTPUT_DIR, file_path=osm_file, prefix='incline')
res = await formatter.osm2osw()
osw_files = res.generated_files

found_incline = False
for f in osw_files:
if f.endswith('.geojson'):
with open(f) as fh:
data = json.load(fh)
for feature in data.get('features', []):
props = feature.get('properties', {})
if 'incline' in props:
found_incline = True
break
if found_incline:
break

self.assertTrue(found_incline, 'No incline tag found in OSW output')

os.remove(osm_file)
for f in osw_files:
os.remove(f)
formatter.cleanup()
47 changes: 47 additions & 0 deletions tests/unit_tests/test_osw2osm/test_osw2osm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import zipfile
import unittest
from src.osm_osw_reformatter.osw2osm.osw2osm import OSW2OSM
import xml.etree.ElementTree as ET
Expand All @@ -8,6 +9,27 @@
TEST_ZIP_FILE = os.path.join(ROOT_DIR, 'test_files/osw.zip')
TEST_WIDTH_ZIP_FILE = os.path.join(ROOT_DIR, 'test_files/width-test.zip')
TEST_DATA_WITH_INCLINE_ZIP_FILE = os.path.join(ROOT_DIR, 'test_files/dataset_with_incline.zip')
TEST_EDGES_WITH_INVALID_INCLINE_FILE = os.path.join(ROOT_DIR, 'test_files/edges_invalid_incline.geojson')
TEST_NODES_WITH_INVALID_INCLINE_FILE = os.path.join(ROOT_DIR, 'test_files/nodes_invalid_incline.geojson')


def _create_invalid_incline_zip(zip_path: str) -> str:
"""Create a temporary OSW dataset with invalid incline values.

The resulting ZIP mirrors the structure expected by ``OSW2OSM`` and contains
both ``edges.geojson`` and ``nodes.geojson`` files.

Args:
zip_path: Location where the archive will be written.

Returns:
The path to the generated ZIP archive.
"""
os.makedirs(os.path.dirname(zip_path), exist_ok=True)
with zipfile.ZipFile(zip_path, 'w') as zf:
zf.write(TEST_EDGES_WITH_INVALID_INCLINE_FILE, arcname='edges.geojson')
zf.write(TEST_NODES_WITH_INVALID_INCLINE_FILE, arcname='nodes.geojson')
return zip_path


class TestOSW2OSM(unittest.IsolatedAsyncioTestCase):
Expand Down Expand Up @@ -105,6 +127,31 @@ def test_incline_tags_do_not_have_climb(self):

os.remove(result.generated_files)

def test_invalid_incline_values_are_excluded(self):
zip_path = os.path.join(OUTPUT_DIR, 'dataset_with_invalid_incline.zip')
zip_file = _create_invalid_incline_zip(zip_path)
osw2osm = OSW2OSM(zip_file_path=zip_file, workdir=OUTPUT_DIR, prefix='invalid')
result = osw2osm.convert()

# Ensure conversion succeeded so the XML file path is valid
self.assertTrue(result.status, msg=getattr(result, 'error', 'Conversion failed'))
xml_file_path = result.generated_files

tree = ET.parse(xml_file_path)
root = tree.getroot()

for way in root.findall('.//way'):
tags = {tag.get('k'): tag.get('v') for tag in way.findall('tag')}
if tags.get('_id') == '2':
self.assertNotIn('incline', tags)
if tags.get('_id') == '1':
self.assertEqual(tags.get('incline'), '0.1')

self.assertEqual(len(root.findall(".//tag[@k='incline']")), 1)

os.remove(result.generated_files)
os.remove(zip_file)


if __name__ == '__main__':
unittest.main()
4 changes: 2 additions & 2 deletions tests/unit_tests/test_serializer/test_osm_normalizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ def test_removes_climb_without_incline(self):
self.assertNotIn("climb", normalizer)
self.assertNotIn("incline", normalizer)

def test_retains_non_numeric_incline_without_climb(self):
def test_discards_non_numeric_incline_without_climb(self):
tags = {"highway": "footway", "incline": "steep"}
normalizer = self.normalizer.filter_tags(tags)
self.assertEqual(normalizer["incline"], "steep")
self.assertNotIn("incline", normalizer)
self.assertNotIn("climb", normalizer)

def test_retains_climb_when_highway_is_steps(self):
Expand Down
66 changes: 63 additions & 3 deletions tests/unit_tests/test_serializer/test_osw_normalizer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
import unittest
from src.osm_osw_reformatter.serializer.osw.osw_normalizer import OSWWayNormalizer, OSWNodeNormalizer, \
OSWPointNormalizer, tactile_paving, surface, crossing_markings, climb, _normalize
import importlib.util
from pathlib import Path

module_path = Path(__file__).resolve().parents[3] / 'src/osm_osw_reformatter/serializer/osw/osw_normalizer.py'
spec = importlib.util.spec_from_file_location('osw_normalizer', module_path)
osw_normalizer = importlib.util.module_from_spec(spec)
spec.loader.exec_module(osw_normalizer)

OSWWayNormalizer = osw_normalizer.OSWWayNormalizer
OSWNodeNormalizer = osw_normalizer.OSWNodeNormalizer
OSWPointNormalizer = osw_normalizer.OSWPointNormalizer
tactile_paving = osw_normalizer.tactile_paving
surface = osw_normalizer.surface
crossing_markings = osw_normalizer.crossing_markings
climb = osw_normalizer.climb
incline = osw_normalizer.incline
_normalize = osw_normalizer._normalize


class TestOSWWayNormalizer(unittest.TestCase):
Expand Down Expand Up @@ -29,6 +44,11 @@ def test_is_stairs(self):
normalizer = OSWWayNormalizer(tags)
self.assertTrue(normalizer.is_stairs())

def test_is_stairs_with_invalid_climb(self):
tags = {'highway': 'steps', 'climb': 'left'}
normalizer = OSWWayNormalizer(tags)
self.assertTrue(normalizer.is_stairs())

def test_is_pedestrian(self):
tags = {'highway': 'pedestrian'}
normalizer = OSWWayNormalizer(tags)
Expand All @@ -53,6 +73,41 @@ def test_normalize_crossing(self):
expected = {'highway': 'footway', 'footway': 'crossing', 'foot': 'yes'}
self.assertEqual(result, expected)

def test_normalize_incline(self):
tags = {'highway': 'footway', 'incline': '10'}
normalizer = OSWWayNormalizer(tags)
result = normalizer.normalize()
expected = {'highway': 'footway', 'incline': 10.0, 'foot': 'yes'}
self.assertEqual(result, expected)

def test_normalize_length(self):
tags = {'highway': 'footway', 'length': '12'}
normalizer = OSWWayNormalizer(tags)
result = normalizer.normalize()
expected = {'highway': 'footway', 'length': 12.0, 'foot': 'yes'}
self.assertEqual(result, expected)

def test_normalize_stairs_keeps_climb(self):
tags = {'highway': 'steps', 'climb': 'down', 'step_count': '3'}
normalizer = OSWWayNormalizer(tags)
result = normalizer.normalize()
expected = {'highway': 'steps', 'climb': 'down', 'step_count': 3, 'foot': 'yes'}
self.assertEqual(result, expected)

def test_normalize_stairs_defaults_highway_and_no_foot(self):
tags = {'climb': 'up'}
normalizer = OSWWayNormalizer(tags)
result = normalizer._normalize_stairs()
expected = {'climb': 'up', 'foot': 'yes'}
self.assertEqual(result, expected)

def test_normalize_stairs_drops_invalid_climb(self):
tags = {'highway': 'steps', 'climb': 'left'}
normalizer = OSWWayNormalizer(tags)
result = normalizer.normalize()
expected = {'highway': 'steps', 'foot': 'yes'}
self.assertEqual(result, expected)

def test_normalize_invalid_way(self):
tags = {'highway': 'invalid_type'}
normalizer = OSWWayNormalizer(tags)
Expand Down Expand Up @@ -121,11 +176,16 @@ def test_crossing_markings(self):
self.assertEqual(crossing_markings('dots', {'crossing:markings': 'dots'}), 'dots')
self.assertIsNone(crossing_markings('invalid_value', {'crossing:markings': 'invalid_value'}))

def test_incline(self):
def test_climb(self):
self.assertEqual(climb('up', {}), 'up')
self.assertEqual(climb('down', {}), 'down')
self.assertIsNone(climb('invalid_value', {}))

def test_incline(self):
self.assertEqual(incline('10', {}), 10.0)
self.assertEqual(incline('0.5', {}), 0.5)
self.assertIsNone(incline('steep', {}))


class TestNormalizeWidthField(unittest.TestCase):
def test_removes_width_when_value_is_nan_string(self):
Expand Down
Loading