-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwsdot_turn.py
More file actions
107 lines (87 loc) · 2.93 KB
/
wsdot_turn.py
File metadata and controls
107 lines (87 loc) · 2.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import pandas as pd
import geopandas as gpd
import os
import numpy as np
from ssr_tools.gen_utilities.gen_utilities import *
import pandas as pd
import numpy as np
from pathlib import Path
from __future__ import annotations # must be first
from typing import Iterable, Dict, Any, Optional, Union
import pandas as pd
import numpy as np
from pathlib import Path
import pandas as pd
import numpy as np
from pathlib import Path
def summarize_turn_lane_intersections(
csv_path,
*,
x_col = "X",
y_col = "Y",
lr_col = "LeftRightInd",
tol_m = None,
keep_cols = None
):
df = pd.read_csv(csv_path)
# Normalize L/R indicator
df["_lr"] = (
df[lr_col].astype(str).str.upper().str.strip()
.where(lambda s: s.isin(["L", "R"]), np.nan)
)
# Grouping key for "intersection"
if tol_m is None:
# exact coordinates
df["_gx"] = df[x_col]
df["_gy"] = df[y_col]
else:
# snap to a grid at tol_m to cluster near-duplicates (works well for EPSG:3857)
df["_gx"] = (df[x_col] / tol_m).round().astype("Int64")
df["_gy"] = (df[y_col] / tol_m).round().astype("Int64")
g = df.groupby(["_gx", "_gy"], dropna=False)
# Representative center (median of member points)
centers = (
g[[x_col, y_col]].median()
.rename(columns={x_col: "X_rep", y_col: "Y_rep"})
.reset_index()
)
# Flags
has_left = g["_lr"].apply(lambda s: (s == "L").any()).rename("has_left").reset_index()
has_right = g["_lr"].apply(lambda s: (s == "R").any()).rename("has_right").reset_index()
intersections = (
centers
.merge(has_left, on=["_gx","_gy"], how="left")
.merge(has_right, on=["_gx","_gy"], how="left")
)
# Optional representative columns (first value in each group)
if keep_cols:
reps = g[keep_cols].agg("first").reset_index()
intersections = intersections.merge(reps, on=["_gx","_gy"], how="left")
# Classify
intersections["class"] = np.select(
[
intersections["has_left"] & intersections["has_right"],
intersections["has_left"] & ~intersections["has_right"],
~intersections["has_left"] & intersections["has_right"],
],
["Both", "Left", "Right"],
default="None",
)
# Counts
counts = (
intersections["class"]
.value_counts()
.reindex(["Left", "Right", "Both", "None"])
.fillna(0)
.astype(int)
)
intersections = intersections.drop(columns=["_gx","_gy"]).reset_index(drop=True)
return intersections, counts
intersections, counts = summarize_turn_lane_intersections(
"/Users/balmdale/Downloads/WSDOT_-_Roadway_Data_Turn_Lanes.csv",
tol_m=None, # or try tol_m=5.0 if XYs vary slightly per lane at the same node
keep_cols=["RouteIdentifier"] # optional
)
find_frequency(intersections['class'])
print(counts)
# intersections.head()