Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
310 changes: 73 additions & 237 deletions utils/get_osm_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,10 @@
from argparse import ArgumentParser
import logging
import osmnx as ox
import networkx as nx
import matplotlib.patches as mpatches
import matplotlib.pyplot as plt
import pandas as pd
from shapely.geometry import MultiLineString, LineString
from shapely.ops import linemerge

RGBA_RED = (1, 0, 0, 0.3)
__version__ = "2025.1.16"

RGBA_RED = (1, 0, 0, 1)
RGBA_WHITE = (1, 1, 1, 1)

FLAGS_MOTORWAY = ["motorway", "motorway_link"]
Expand All @@ -45,171 +41,6 @@
"busway",
]


def merge_edges(
graph: nx.DiGraph, previous_node: int, successive_node: int, node: int
) -> dict:
"""
Merge two edges into a single edge.
The function merges the edges into a single edge if the following conditions are met:
- the name of the two edges is the same
- the number of lanes is the same
- the geometry of the two edges is contiguous
- the coordinates of the previous_node and successive_node are in the geometry
Parameters:
----------------
graph (networkx.DiGraph): the graph
previous_node (int): the previous node
successive_node (int): the successive node
node (int): the id of the node which will be removed
Returns:
----------------
dict: the new edge
"""
try:
data_u = graph.get_edge_data(previous_node, node)[0]
data_v = graph.get_edge_data(node, successive_node)[0]
data_u.setdefault("lanes", 1)
data_v.setdefault("lanes", 1)
if (
not (data_u["name"] in data_v["name"] or data_v["name"] in data_u["name"])
or data_u["lanes"] != data_v["lanes"]
):
return None
edge_uv = data_u.copy()
# set length as the sum
edge_uv["length"] = data_u["length"] + data_v["length"]
edge_uv["lanes"] = int(data_u["lanes"])
# merge also linestrings
edge_uv["geometry"] = data_u["geometry"].union(data_v["geometry"])
if isinstance(edge_uv["geometry"], MultiLineString):
edge_uv["geometry"] = linemerge(edge_uv["geometry"])
else:
edge_uv["geometry"] = edge_uv["geometry"]
if isinstance(edge_uv["geometry"], LineString):
coords = list(edge_uv["geometry"].coords)
else:
# If it's still a MultiLineString,
# handle it by iterating through its individual LineStrings
coords = []
for line in edge_uv["geometry"]:
coords.extend(
list(line.coords)
) # Add coords from each individual LineString
# take the list from coordinates of previous_node to coordinates of successive_node
u_coords = (graph.nodes[previous_node]["x"], graph.nodes[previous_node]["y"])
v_coords = (
graph.nodes[successive_node]["x"],
graph.nodes[successive_node]["y"],
)
if u_coords not in coords or v_coords not in coords:
return None
# cut coords from u_index to v_index
edge_uv["geometry"] = LineString(
coords[coords.index(u_coords) : coords.index(v_coords)]
)
except TypeError:
# type error means that data_u or data_v cannot be created,
# which means that the road is a one-way road
# thus, skip the type error
return None

return edge_uv


def simplify_graph(graph_original: nx.DiGraph) -> nx.DiGraph:
"""
Simplify the graph by removing nodes that have only two neighborsand are actually the same
street.
The function merges the edges into a single edge.
Parameters:
----------------
graph_original (networkx.DiGraph): the graph to simplify
Returns:
----------------
networkx.DiGraph: the simplified graph
"""
graph = graph_original.copy()
previous_nodes = 0
while previous_nodes != len(graph.nodes):
logging.info("New cycle: current_nodes=%d", len(graph.nodes))
previous_nodes = len(graph.nodes)
for node in graph.copy().nodes:
# define neighborus as list of predecessors and successors
neighbours = list(graph.predecessors(node)) + list(graph.successors(node))
if (
len(neighbours) != 2
or graph.in_degree(node) != graph.out_degree(node)
or graph.in_degree(node) > 2
):
continue
u, v = neighbours
if graph.has_edge(u, v):
continue
edge_uv = merge_edges(graph, u, v, node)
edge_vu = merge_edges(graph, v, u, node)

if not (edge_uv is None and edge_vu is None):
if edge_uv is not None:
# print(f"Edges {u} -> {node} and {node} -> {v} can be merged.")
graph.add_edge(
u,
v,
length=edge_uv["length"],
name=edge_uv["name"],
geometry=edge_uv["geometry"],
)
# print(f"Added edge {graph.get_edge_data(u, v)}")
if edge_vu is not None:
# print(f"Edges {v} -> {node} and {node} -> {u} can be merged.")
graph.add_edge(
v,
u,
length=edge_vu["length"],
name=edge_vu["name"],
geometry=edge_vu["geometry"],
)
# print(f"Added edge {graph.get_edge_data(v, u)}")
graph.remove_node(node)
# print(f"Removed node {node}")

# Remove all nodes that are not in the giant component
graph.remove_nodes_from(
set(graph.nodes) - max(list(nx.weakly_connected_components(graph)), key=len)
)
# remove all self-loops
graph.remove_edges_from(list(nx.selfloop_edges(graph)))
# check if there are edges with same u and v. If true, keep only the one with the bigger lanes
edges_to_remove = []
seen_edges = {}

for u, v, data in graph.edges(data=True):
lanes = data.get("lanes", 0)

if (u, v) not in seen_edges:
seen_edges[(u, v)] = (lanes, None) # Store first edge and its lanes count
else:
existing_lanes, existing_edge = seen_edges[(u, v)]

if lanes > existing_lanes:
edges_to_remove.append(
existing_edge
) # Remove the previous edge if the current one has more lanes
seen_edges[(u, v)] = (lanes, (u, v)) # Update to keep current edge
else:
edges_to_remove.append(
(u, v)
) # Remove the current edge if it has fewer 'lanes'

graph.remove_edges_from(edges_to_remove)

return graph


if __name__ == "__main__":
parser = ArgumentParser("Script to get the OSM data of a place.")
parser.add_argument(
Expand All @@ -220,12 +51,24 @@
parser.add_argument(
"--exclude-motorway",
action="store_true",
help="Exclude motorways from the data",
help="Exclude motorways from the data. Default is False",
)
parser.add_argument(
"--exclude-residential",
action="store_true",
help="Exclude residential roads from the data",
help="Exclude residential roads from the data. Default is False",
)
parser.add_argument(
"--allow-duplicates",
action="store_true",
help="Allow duplicated edges in the data. Default is False",
)
parser.add_argument(
"-t",
"--tolerance",
type=int,
default=20,
help="Radius in meters to merge intersections. For more info, see osmnx documentation.",
)
parser = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
Expand All @@ -240,78 +83,46 @@
logging.ERROR, f"\033[1;31m{logging.getLevelName(logging.ERROR)}\033[1;0m"
)

# get the street network for San Cesario sul Panaro
G_ALL = ox.graph_from_place(parser.place, network_type="drive")
logging.info("Welcome to get_osm_data.py v%s", __version__)

# define CUSTOM_FILTER basing on FLAGS and args
FLAGS = FLAGS_NORMAL
if not parser.exclude_motorway:
FLAGS += FLAGS_MOTORWAY
if not parser.exclude_residential:
FLAGS += FLAGS_RESIDENTIAL
CUSTOM_FILTER = f"[\"highway\"~\"{'|'.join(FLAGS)}\"]"
logging.info("Custom filter: %s", CUSTOM_FILTER)
GRAPH = ox.graph_from_place(parser.place, network_type="drive")
ox.plot_graph(GRAPH, show=False, close=True, save=True, filepath="./original.png")
logging.info(
"Graph created with %d nodes and %d edges.", len(G_ALL.nodes), len(G_ALL.edges)
)

gdf_nodes, gdf_edges = ox.graph_to_gdfs(G_ALL)
gdf_edges["highway"] = gdf_edges["highway"].apply(
lambda x: x[-1] if isinstance(x, list) else x
)
if "lanes" not in gdf_edges.columns:
gdf_edges["lanes"] = 1
gdf_edges["lanes"] = gdf_edges["lanes"].apply(
lambda x: max(x) if isinstance(x, list) else 1 if pd.isna(x) else x
"Original network has %d nodes and %d edges.",
len(GRAPH.nodes),
len(GRAPH.edges),
)
gdf_edges["name"] = gdf_edges["name"].apply(
lambda x: " ".join(x) if isinstance(x, list) else " " if pd.isna(x) else x
GRAPH = ox.graph_from_place(
parser.place, network_type="drive", custom_filter=CUSTOM_FILTER
)
# gdf_edges = gdf_edges[~gdf_edges["access"].isin(["no", "private"])]

# Make a plot to visualize the removed links
removed_patch = mpatches.Patch(color=RGBA_RED, label="Removed Nodes and Edges")

if parser.exclude_motorway:
gdf_edges = gdf_edges[~gdf_edges["highway"].isin(FLAGS_MOTORWAY)]
if parser.exclude_residential:
gdf_edges = gdf_edges[~gdf_edges["highway"].isin(FLAGS_RESIDENTIAL)]

# rebuild the graph
G = ox.graph_from_gdfs(gdf_nodes, gdf_edges)
G.remove_nodes_from(list(nx.isolates(G)))
logging.info(
"Graph filtered: now it has %d nodes and %d edges.", len(G.nodes), len(G.edges)
"Custom filtered graph has %d nodes and %d edges.",
len(GRAPH.nodes),
len(GRAPH.edges),
)
G = simplify_graph(G)
logging.info(
"Graph simplified: now it has %d nodes and %d edges.",
len(G.nodes),
len(G.edges),
GRAPH = ox.consolidate_intersections(
ox.project_graph(GRAPH), tolerance=parser.tolerance
)
# assert that graph has not isolated nodes
assert not list(nx.isolates(G))
# assert that graph has not self-loops
assert not list(nx.selfloop_edges(G))

fig, ax = ox.plot_graph(
G_ALL,
node_color=[
RGBA_RED if node not in G.nodes else RGBA_WHITE for node in G_ALL.nodes
],
edge_color=[
RGBA_RED if edge not in G.edges else RGBA_WHITE for edge in G_ALL.edges
],
show=False,
close=False,
logging.info(
"Consolidated graph has %d nodes and %d edges.",
len(GRAPH.nodes),
len(GRAPH.edges),
)
ax.legend(handles=[removed_patch])
fig.set_size_inches(16, 9)
plt.savefig("removed_nodes_and_edges.png")

# Plot resulting graph
fig, ax = ox.plot_graph(G, show=False, close=False)
fig.set_size_inches(16, 9)
plt.savefig("final_graph.png")

gdf_nodes, gdf_edges = ox.graph_to_gdfs(G)
# plot graph on a 16x9 figure and save into file
ox.plot_graph(GRAPH, show=False, close=True, save=True, filepath="./final.png")
gdf_nodes, gdf_edges = ox.graph_to_gdfs(ox.project_graph(GRAPH, to_latlong=True))

Check warning

Code scanning / Pylint (reported by Codacy)

Constant name "gdf_nodes" doesn't conform to UPPER_CASE naming style Warning

Constant name "gdf_nodes" doesn't conform to UPPER_CASE naming style

Check warning

Code scanning / Pylint (reported by Codacy)

Constant name "gdf_edges" doesn't conform to UPPER_CASE naming style Warning

Constant name "gdf_edges" doesn't conform to UPPER_CASE naming style
# notice that osmnid is the index of the gdf_nodes DataFrame, so take it as a column
gdf_nodes.reset_index(inplace=True)
gdf_edges.reset_index(inplace=True)

# assert that there are no edges with the same u and v
assert not gdf_edges.duplicated(subset=["u", "v"]).any()
# Prepare node dataframe
gdf_nodes = gdf_nodes[["osmid", "x", "y", "highway"]]
# Prepare edge dataframe
Expand All @@ -320,6 +131,31 @@
gdf_edges = gdf_edges[
["u", "v", "length", "oneway", "lanes", "highway", "maxspeed", "name"]
]
if parser.allow_duplicates:
N_DUPLICATES = 0
else:
# Check for duplicate edges
duplicated_mask = gdf_edges.duplicated(subset=["u", "v"])

Check warning

Code scanning / Pylint (reported by Codacy)

Constant name "duplicated_mask" doesn't conform to UPPER_CASE naming style Warning

Constant name "duplicated_mask" doesn't conform to UPPER_CASE naming style
N_DUPLICATES = duplicated_mask.sum()

if N_DUPLICATES > 0:
logging.warning(
"There are %d duplicated edges which will be removed. "
"Please look at them in the promped plot.",
N_DUPLICATES,
)
# Plot the graph with duplicated edges in red
edge_colors = [

Check warning

Code scanning / Pylint (reported by Codacy)

Constant name "edge_colors" doesn't conform to UPPER_CASE naming style Warning

Constant name "edge_colors" doesn't conform to UPPER_CASE naming style
RGBA_RED if duplicated_mask.iloc[i] else RGBA_WHITE
for i in range(len(gdf_edges))
]
ox.plot_graph(GRAPH, edge_color=edge_colors)

# Remove duplicated edges
gdf_edges = gdf_edges.drop_duplicates(subset=["u", "v"])

Check warning

Code scanning / Pylint (reported by Codacy)

Constant name "gdf_edges" doesn't conform to UPPER_CASE naming style Warning

Constant name "gdf_edges" doesn't conform to UPPER_CASE naming style
# Save the data
gdf_nodes.to_csv("nodes.csv", sep=";", index=False)
gdf_edges.to_csv("edges.csv", sep=";", index=False)
place = parser.place.split(",")[0].strip().lower()

Check warning

Code scanning / Pylint (reported by Codacy)

Constant name "place" doesn't conform to UPPER_CASE naming style Warning

Constant name "place" doesn't conform to UPPER_CASE naming style
gdf_nodes.to_csv(f"{place}_nodes.csv", sep=";", index=False)
logging.info('Nodes correctly saved in "%s_nodes.csv"', place)
gdf_edges.to_csv(f"{place}_edges.csv", sep=";", index=False)
logging.info('Edges correctly saved in "%s_edges.csv"', place)
Loading