|
5 | 5 | import pycountry |
6 | 6 | from geoalchemy2 import WKTElement |
7 | 7 | from geoalchemy2.shape import to_shape |
| 8 | +from pyproj import Geod |
| 9 | +from shapely.geometry.geo import mapping |
8 | 10 | from sqlalchemy.orm import Session |
9 | 11 |
|
10 | 12 | from shared.database_gen.sqlacodegen_models import ( |
|
18 | 20 | from shared.helpers.locations import get_geopolygons_covers |
19 | 21 |
|
20 | 22 | ERROR_STATUS_CODE = 299 # Custom error code for the function to avoid retries |
| 23 | +GEOD = Geod(ellps="WGS84") # Geod object for geodesic calculations |
21 | 24 |
|
22 | 25 |
|
23 | 26 | def generate_color( |
@@ -165,17 +168,66 @@ def extract_location_aggregate( |
165 | 168 | ) |
166 | 169 |
|
167 | 170 |
|
| 171 | +def geodesic_area_m2(geom) -> float: |
| 172 | + """Return absolute geodesic area in m² for a Shapely geometry (Polygon/MultiPolygon).""" |
| 173 | + # pyproj can handle GeoJSON-like mappings, including MultiPolygons |
| 174 | + area, _perim = GEOD.geometry_area_perimeter(mapping(geom)) |
| 175 | + return abs(area) |
| 176 | + |
| 177 | + |
| 178 | +def resolve_same_level_conflicts(items: List[Geopolygon]) -> Geopolygon: |
| 179 | + # 1) Prefer iso_3166_2_code if present |
| 180 | + candidates = [g for g in items if g.iso_3166_2_code] or items |
| 181 | + |
| 182 | + if len(candidates) > 1: |
| 183 | + # 2) Prefer the smallest geodesic area (m²) |
| 184 | + def area_for(g: Geopolygon) -> float: |
| 185 | + try: |
| 186 | + geom = to_shape(g.geometry) # -> Shapely geometry (in EPSG:4326) |
| 187 | + return geodesic_area_m2(geom) |
| 188 | + except Exception: |
| 189 | + # If geometry missing/bad, push to the end |
| 190 | + return float("inf") |
| 191 | + |
| 192 | + candidates.sort(key=area_for) |
| 193 | + |
| 194 | + # 3) Prefer with name, then lowest OSM id |
| 195 | + candidates.sort(key=lambda g: (0 if (g.name and g.name.strip()) else 1, g.osm_id)) |
| 196 | + return candidates[0] |
| 197 | + |
| 198 | + |
| 199 | +def dedupe_by_admin_level(geopolygons: List[Geopolygon], logger) -> List[Geopolygon]: |
| 200 | + """Keep one polygon per admin_level using tie-break rules above.""" |
| 201 | + by_level: dict[int, List[Geopolygon]] = {} |
| 202 | + for g in geopolygons: |
| 203 | + by_level.setdefault(g.admin_level, []).append(g) |
| 204 | + |
| 205 | + winners: List[Geopolygon] = [] |
| 206 | + for level, items in sorted( |
| 207 | + by_level.items(), key=lambda kv: kv[0] |
| 208 | + ): # keep order by level |
| 209 | + if len(items) > 1: |
| 210 | + logger.debug( |
| 211 | + "Tie at admin_level=%s among osm_ids=%s", |
| 212 | + level, |
| 213 | + [i.osm_id for i in items], |
| 214 | + ) |
| 215 | + winners.append(resolve_same_level_conflicts(items)) |
| 216 | + return winners |
| 217 | + |
| 218 | + |
168 | 219 | def extract_location_aggregate_geopolygons( |
169 | 220 | stop_point: WKTElement, geopolygons, logger, db_session: Session |
170 | 221 | ) -> Optional[GeopolygonAggregate]: |
171 | 222 | admin_levels = {g.admin_level for g in geopolygons} |
172 | | - if len(admin_levels) != len(geopolygons): |
| 223 | + # If duplicates per admin_level exist, resolve instead of returning None |
| 224 | + if admin_levels != len(geopolygons): |
173 | 225 | logger.warning( |
174 | 226 | "Duplicate admin levels for point: %s -> %s", |
175 | 227 | stop_point, |
176 | 228 | geopolygons_as_string(geopolygons), |
177 | 229 | ) |
178 | | - return None |
| 230 | + geopolygons = dedupe_by_admin_level(geopolygons, logger) |
179 | 231 |
|
180 | 232 | valid_iso_3166_1 = any(g.iso_3166_1_code for g in geopolygons) |
181 | 233 | valid_iso_3166_2 = any(g.iso_3166_2_code for g in geopolygons) |
|
0 commit comments