|
1 | 1 | import logging |
| 2 | +from copy import deepcopy |
2 | 3 | from typing import Dict, List, Optional, Tuple |
3 | 4 |
|
4 | 5 | import numpy as np |
| 6 | +from scipy.interpolate import splev, splprep |
5 | 7 |
|
6 | 8 | from py123d.conversion.utils.map_utils.opendrive.parser.lane import XODRRoadMark |
7 | 9 | from py123d.conversion.utils.map_utils.opendrive.parser.opendrive import XODR, Junction |
|
19 | 21 | lane_section_to_lane_helpers, |
20 | 22 | ) |
21 | 23 | from py123d.conversion.utils.map_utils.opendrive.utils.objects_helper import OpenDriveObjectHelper, get_object_helper |
| 24 | +from py123d.geometry.polyline import Polyline3D, PolylineSE2 |
| 25 | +from py123d.geometry.utils.polyline_utils import get_points_2d_yaws, offset_points_perpendicular |
22 | 26 |
|
23 | 27 | logger = logging.getLogger(__name__) |
24 | 28 |
|
@@ -80,9 +84,10 @@ def collect_element_helpers( |
80 | 84 | _deduplicate_connections(lane_helper_dict) |
81 | 85 | # 3.4. Remove invalid connections based on centerline distances |
82 | 86 | _post_process_connections(lane_helper_dict, connection_distance_threshold) |
83 | | - |
84 | 87 | # 3.5. Propagate speed limits to junction lanes (they often lack <type> elements) |
85 | 88 | _propagate_speed_limits_to_junction_lanes(lane_helper_dict, road_dict) |
| 89 | + # 3.6. Correct lanes with no connections |
| 90 | + _correct_lanes_with_no_connections(lane_helper_dict) |
86 | 91 |
|
87 | 92 | # 4. Collect lane groups from lane helpers |
88 | 93 | lane_group_helper_dict: Dict[str, OpenDriveLaneGroupHelper] = _collect_lane_groups( |
@@ -320,11 +325,182 @@ def _propagate_speed_limits_to_junction_lanes( |
320 | 325 | break |
321 | 326 |
|
322 | 327 |
|
| 328 | +def _extend_lane_with_shoulder( |
| 329 | + lane_helper: OpenDriveLaneHelper, |
| 330 | + shoulder_helper: OpenDriveLaneHelper, |
| 331 | + is_predecessor: bool, |
| 332 | +) -> OpenDriveLaneHelper: |
| 333 | + """ |
| 334 | + Extend lane polylines using shoulder curve and add driving lane as connection. |
| 335 | +
|
| 336 | + :param lane_helper: The lane to extend |
| 337 | + :param shoulder_helper: Adjacent shoulder lane providing the extension curve |
| 338 | + :param driving_helper: Adjacent driving lane to add as predecessor/successor |
| 339 | + :param is_predecessor: True = no predecessor, False = no successor |
| 340 | + :return: New OpenDriveLaneHelper with extended polylines |
| 341 | + """ |
| 342 | + lane_center = lane_helper.center_polyline_se2.array |
| 343 | + if lane_center.shape[0] < 2: |
| 344 | + return lane_helper |
| 345 | + |
| 346 | + def _sample_polyline_se2(polyline: PolylineSE2, count: int) -> np.ndarray: |
| 347 | + if count <= 1: |
| 348 | + return polyline.array[:count].copy() |
| 349 | + distances = np.linspace(0.0, polyline.length, num=count, dtype=np.float64) |
| 350 | + return np.array(polyline.interpolate(distances), dtype=np.float64) |
| 351 | + |
| 352 | + def _signed_offsets(base_xy: np.ndarray, target_xy: np.ndarray, base_yaws: np.ndarray) -> np.ndarray: |
| 353 | + normals = np.stack( |
| 354 | + [np.cos(base_yaws + np.pi / 2.0), np.sin(base_yaws + np.pi / 2.0)], |
| 355 | + axis=-1, |
| 356 | + ) |
| 357 | + return np.einsum("ij,ij->i", target_xy - base_xy, normals) |
| 358 | + |
| 359 | + count = lane_center.shape[0] |
| 360 | + stable_count = max(int(round(count * 0.3)), 2) |
| 361 | + stable_count = min(stable_count, count) |
| 362 | + stable_slice = slice(count - stable_count, count) if is_predecessor else slice(0, stable_count) |
| 363 | + |
| 364 | + shoulder_inner = _sample_polyline_se2(shoulder_helper.inner_polyline_se2, count) |
| 365 | + shoulder_outer = _sample_polyline_se2(shoulder_helper.outer_polyline_se2, count) |
| 366 | + lane_center_xy = lane_center[:, :2] |
| 367 | + |
| 368 | + inner_dist = np.mean(np.linalg.norm(shoulder_inner[:, :2] - lane_center_xy, axis=1)) |
| 369 | + outer_dist = np.mean(np.linalg.norm(shoulder_outer[:, :2] - lane_center_xy, axis=1)) |
| 370 | + shoulder_sample = shoulder_inner if inner_dist <= outer_dist else shoulder_outer |
| 371 | + |
| 372 | + shoulder_yaws = shoulder_sample[:, 2] |
| 373 | + shoulder_xy = shoulder_sample[:, :2] |
| 374 | + offsets = _signed_offsets(shoulder_xy, lane_center_xy, shoulder_yaws) |
| 375 | + offset_mean = float(np.mean(offsets[stable_slice])) |
| 376 | + if np.isclose(offset_mean, 0.0): |
| 377 | + offset_mean = float(np.mean(offsets)) |
| 378 | + |
| 379 | + shoulder_offset_xy = offset_points_perpendicular(shoulder_sample, offset=offset_mean) |
| 380 | + |
| 381 | + t = np.linspace(0.0, 1.0, count, dtype=np.float64) |
| 382 | + smooth = 3.0 * t**2 - 2.0 * t**3 |
| 383 | + weight = 1.0 - smooth if is_predecessor else smooth |
| 384 | + new_center_xy = (weight[:, None] * shoulder_offset_xy) + ((1.0 - weight)[:, None] * lane_center_xy) |
| 385 | + new_center_xy = new_center_xy.astype(np.float64, copy=False) |
| 386 | + |
| 387 | + if count >= 4 and np.sum(np.linalg.norm(np.diff(new_center_xy, axis=0), axis=1)) > 1e-6: |
| 388 | + tck, _ = splprep(new_center_xy.T, s=0.0, k=min(3, count - 1)) |
| 389 | + u_new = np.linspace(0.0, 1.0, count, dtype=np.float64) |
| 390 | + new_center_xy = np.array(splev(u_new, tck), dtype=np.float64).T |
| 391 | + |
| 392 | + new_center_yaws = get_points_2d_yaws(new_center_xy) |
| 393 | + new_center_se2 = np.column_stack([new_center_xy, new_center_yaws]).astype(np.float64, copy=False) |
| 394 | + |
| 395 | + inner_xy = lane_helper.inner_polyline_se2.array[:, :2] |
| 396 | + outer_xy = lane_helper.outer_polyline_se2.array[:, :2] |
| 397 | + widths = np.linalg.norm(inner_xy - outer_xy, axis=1) |
| 398 | + width_mean = float(np.mean(widths[stable_slice])) |
| 399 | + if width_mean <= 1e-6: |
| 400 | + width_mean = float(np.mean(widths)) |
| 401 | + |
| 402 | + center_yaws = lane_center[:, 2] |
| 403 | + inner_offsets = _signed_offsets(lane_center_xy, inner_xy, center_yaws) |
| 404 | + inner_offset_mean = float(np.mean(inner_offsets[stable_slice])) |
| 405 | + if np.isclose(inner_offset_mean, 0.0): |
| 406 | + inner_offset_mean = float(np.mean(inner_offsets)) |
| 407 | + inner_sign = 1.0 if np.isclose(inner_offset_mean, 0.0) else float(np.sign(inner_offset_mean)) |
| 408 | + |
| 409 | + inner_offset = inner_sign * width_mean / 2.0 |
| 410 | + outer_offset = -inner_offset |
| 411 | + |
| 412 | + new_inner_xy = offset_points_perpendicular(new_center_se2, offset=inner_offset) |
| 413 | + new_outer_xy = offset_points_perpendicular(new_center_se2, offset=outer_offset) |
| 414 | + |
| 415 | + inner_yaws = get_points_2d_yaws(new_inner_xy) |
| 416 | + outer_yaws = get_points_2d_yaws(new_outer_xy) |
| 417 | + new_inner_se2 = np.column_stack([new_inner_xy, inner_yaws]) |
| 418 | + new_outer_se2 = np.column_stack([new_outer_xy, outer_yaws]) |
| 419 | + |
| 420 | + inner_z = lane_helper.inner_polyline_3d.array[:, 2] |
| 421 | + outer_z = lane_helper.outer_polyline_3d.array[:, 2] |
| 422 | + new_inner_3d = np.column_stack([new_inner_xy, inner_z]) |
| 423 | + new_outer_3d = np.column_stack([new_outer_xy, outer_z]) |
| 424 | + |
| 425 | + new_helper = deepcopy(lane_helper) |
| 426 | + new_helper.__dict__["inner_polyline_se2"] = PolylineSE2.from_array(new_inner_se2) |
| 427 | + new_helper.__dict__["outer_polyline_se2"] = PolylineSE2.from_array(new_outer_se2) |
| 428 | + new_helper.__dict__["inner_polyline_3d"] = Polyline3D.from_array(new_inner_3d) |
| 429 | + new_helper.__dict__["outer_polyline_3d"] = Polyline3D.from_array(new_outer_3d) |
| 430 | + return new_helper |
| 431 | + |
| 432 | + |
| 433 | +def _correct_lanes_with_no_connections(lane_helper_dict: Dict[str, OpenDriveLaneHelper]) -> None: |
| 434 | + """ |
| 435 | + Correct merge/exit lanes that have no predecessor or successor connections. |
| 436 | + Extends polylines using adjacent shoulder curve and adds adjacent driving lane as connection. |
| 437 | +
|
| 438 | + :param lane_helper_dict: Dictionary mapping lane ids to their helper objects (modified in-place). |
| 439 | + """ |
| 440 | + lanes_to_update: Dict[str, OpenDriveLaneHelper] = {} |
| 441 | + |
| 442 | + for lane_id, lane_helper in lane_helper_dict.items(): |
| 443 | + if lane_helper.type != "driving": |
| 444 | + continue |
| 445 | + |
| 446 | + road_idx, lane_section_idx, _, lane_idx = lane_id.split("_") |
| 447 | + road_idx, lane_section_idx, lane_idx = int(road_idx), int(lane_section_idx), int(lane_idx) |
| 448 | + |
| 449 | + right_lane_id = build_lane_id(road_idx, lane_section_idx, lane_idx + 1) |
| 450 | + left_lane_id = build_lane_id(road_idx, lane_section_idx, lane_idx - 1) |
| 451 | + |
| 452 | + right_lane = lane_helper_dict.get(right_lane_id) |
| 453 | + left_lane = lane_helper_dict.get(left_lane_id) |
| 454 | + |
| 455 | + # Identify shoulder and driving lanes from adjacent |
| 456 | + shoulder, driving = None, None |
| 457 | + if left_lane and left_lane.type == "shoulder": |
| 458 | + shoulder = left_lane |
| 459 | + if right_lane and right_lane.type == "shoulder": |
| 460 | + shoulder = right_lane |
| 461 | + if left_lane and left_lane.type == "driving": |
| 462 | + driving = left_lane |
| 463 | + if right_lane and right_lane.type == "driving": |
| 464 | + driving = right_lane |
| 465 | + |
| 466 | + no_predecessor = len(lane_helper.predecessor_lane_ids) == 0 |
| 467 | + no_successor = len(lane_helper.successor_lane_ids) == 0 |
| 468 | + |
| 469 | + if no_predecessor and driving: |
| 470 | + if shoulder: |
| 471 | + new_helper = _extend_lane_with_shoulder(lane_helper, shoulder, is_predecessor=True) |
| 472 | + new_helper.predecessor_lane_ids = driving.predecessor_lane_ids |
| 473 | + for pred_id in new_helper.predecessor_lane_ids: |
| 474 | + pred_helper = lane_helper_dict.get(pred_id) |
| 475 | + pred_helper.successor_lane_ids.append(lane_id) |
| 476 | + lanes_to_update[lane_id] = new_helper |
| 477 | + else: |
| 478 | + print(f"Lane {lane_id} no predecessor: added {driving.lane_id}, no shoulder to extend") |
| 479 | + continue |
| 480 | + |
| 481 | + if no_successor and driving: |
| 482 | + # Use existing updated helper if we already created one for predecessor |
| 483 | + base_helper = lanes_to_update.get(lane_id, lane_helper) |
| 484 | + if shoulder: |
| 485 | + new_helper = _extend_lane_with_shoulder(base_helper, shoulder, is_predecessor=False) |
| 486 | + new_helper.successor_lane_ids = driving.successor_lane_ids |
| 487 | + for succ_id in new_helper.successor_lane_ids: |
| 488 | + succ_helper = lane_helper_dict.get(succ_id) |
| 489 | + succ_helper.predecessor_lane_ids.append(lane_id) |
| 490 | + lanes_to_update[lane_id] = new_helper |
| 491 | + else: |
| 492 | + print(f"Lane {lane_id} no successor: added {driving.lane_id}, no shoulder to extend") |
| 493 | + continue |
| 494 | + |
| 495 | + # Apply updates |
| 496 | + lane_helper_dict.update(lanes_to_update) |
| 497 | + |
| 498 | + |
323 | 499 | def _collect_lane_groups( |
324 | 500 | lane_helper_dict: Dict[str, OpenDriveLaneHelper], |
325 | 501 | junction_dict: Dict[int, Junction], |
326 | 502 | road_dict: Dict[int, XODRRoad], |
327 | | -) -> None: |
| 503 | +) -> Dict[str, OpenDriveLaneGroupHelper]: |
328 | 504 | lane_group_helper_dict: Dict[str, OpenDriveLaneGroupHelper] = {} |
329 | 505 |
|
330 | 506 | def _collect_lane_helper_of_id(lane_group_id: str) -> List[OpenDriveLaneHelper]: |
|
0 commit comments