11from collections import defaultdict
2- from typing import List
2+ from typing import Any , List , Optional
33
44import networkx as nx
55import overpy
1616 get_additional_signals ,
1717 get_opposite_edge_pairs ,
1818 get_signal_classification_number ,
19+ get_signal_direction ,
1920 get_signal_function ,
2021 get_signal_kind ,
2122 get_signal_name ,
2223 get_signal_states ,
23- getSignalDirection ,
2424 is_end_node ,
2525 is_same_edge ,
2626 is_signal ,
@@ -35,34 +35,41 @@ def __init__(self):
3535 self .top_nodes : list [OverpyNode ] = []
3636 self .node_data : dict [str , OverpyNode ] = {}
3737 self .ways : dict [str , List [overpy .Way ]] = defaultdict (list )
38- self .paths : dict [str , List [List ]] = defaultdict (list )
38+ self .paths : dict [tuple [ Optional [ Any ], Optional [ Any ]] , List [List ]] = defaultdict (list )
3939 self .api = overpy .Overpass (url = "https://osm.hpi.de/overpass/api/interpreter" )
4040 self .topology = Topology ()
4141
42- def _get_track_objects (self , polygon : str ):
43- query = f'(way["railway"="rail"](poly: "{ polygon } ");node(w)(poly: "{ polygon } "););out body;'
42+ def _get_track_objects (self , polygon : str , railway_option_types : list [str ]):
43+ query_parts = ""
44+ for _type in railway_option_types :
45+ query_parts = (
46+ query_parts
47+ + f'way["railway"="{ _type } "](poly: "{ polygon } ");node(w)(poly: "{ polygon } ");'
48+ )
49+ query = f"({ query_parts } );out body;"
50+ print (query )
4451 return self ._query_api (query )
4552
4653 def _query_api (self , query ):
4754 result = self .api .query (query )
4855 return result
4956
5057 def _build_graph (self , track_objects ):
51- G = nx .Graph ()
58+ graph = nx .Graph ()
5259 for way in track_objects .ways :
5360 previous_node = None
5461 for idx , node_id in enumerate (way ._node_ids ):
5562 try :
5663 node = track_objects .get_node (node_id )
5764 self .node_data [node_id ] = node
5865 self .ways [str (node_id )].append (way )
59- G .add_node (node .id )
66+ graph .add_node (node .id )
6067 if previous_node :
61- G .add_edge (previous_node .id , node .id )
68+ graph .add_edge (previous_node .id , node .id )
6269 previous_node = node
6370 except overpy .exception .DataIncomplete :
6471 continue
65- return G
72+ return graph
6673
6774 def _get_next_top_node (self , node , edge : "tuple[str, str]" , path ):
6875 node_to_id = edge [1 ]
@@ -111,7 +118,7 @@ def _add_signals(self, path, edge: model.Edge, node_before, node_after):
111118 signal_geo_point
112119 ),
113120 side_distance = dist_edge (node_before , node_after , node ),
114- direction = getSignalDirection (
121+ direction = get_signal_direction (
115122 edge , self .ways , path , node .tags ["railway:signal:direction" ]
116123 ),
117124 function = get_signal_function (node ),
@@ -143,15 +150,18 @@ def _should_add_edge(self, node_a: model.Node, node_b: model.Node, path: list[in
143150 present_paths = self .paths [(node_a , node_b )] + self .paths [(node_b , node_a )]
144151 return path not in present_paths and reversed_path not in present_paths
145152
146- def run (self , polygon ):
147- track_objects = self ._get_track_objects (polygon )
153+ def run (self , polygon , railway_option_types : list [str ] = None ):
154+ if railway_option_types is None :
155+ railway_option_types = ["rail" ]
156+ track_objects = self ._get_track_objects (polygon , railway_option_types )
148157 self .graph = self ._build_graph (track_objects )
149158
150- # ToDo: Check whether all edges really link to each other in ORM or if there might be edges missing for nodes that are just a few cm from each other
159+ # ToDo: Check whether all edges really link to each other in ORM or if there might be
160+ # edges missing for nodes that are just a few cm from each other
151161 # Only nodes with max 1 edge or that are a switch can be top nodes
152162 for node_id in self .graph .nodes :
153163 node = self .node_data [node_id ]
154- if is_end_node (node , self .graph ) or is_switch (node ):
164+ if is_end_node (node , self .graph ) or is_switch (node , self . graph ):
155165 self .top_nodes .append (node )
156166
157167 for node in self .top_nodes :
@@ -201,7 +211,8 @@ def run(self, polygon):
201211 e for e in self .topology .edges .values () if e .node_a == node or e .node_b == node
202212 ]
203213
204- # merge edges, this means removing the switch and allowing only one path for each origin
214+ # merge edges, this means removing the switch and
215+ # allowing only one path for each origin
205216 edge_pair_1 , edge_pair_2 = get_opposite_edge_pairs (connected_edges , node )
206217 new_edge_1 = merge_edges (* edge_pair_1 , node )
207218 new_edge_2 = merge_edges (* edge_pair_2 , node )
@@ -226,8 +237,10 @@ def run(self, polygon):
226237 except DataIncomplete :
227238 nodes = way .get_nodes (resolve_missing = True )
228239 for candidate in nodes :
229- # we are only interested in nodes outside the bounding box as every node
230- # that has been previously known was already visited as part of the graph
240+ # we are only interested in nodes outside the
241+ # bounding box as every node that has been
242+ # previously known was already visited as
243+ # part of the graph
231244 if (
232245 candidate .id != int (node .name )
233246 and candidate .id not in self .node_data .keys ()
@@ -247,9 +260,9 @@ def run(self, polygon):
247260 break
248261 if not substitute_found :
249262 # if no substitute was found, the third node seems to be inside the bounding box
250- # this can happen when a node is connected to the same node twice (e.g. station on
251- # lines with only one track). WARNING: this produced weird results in the past.
252- # It should be okay to do it after the check above.
263+ # this can happen when a node is connected to the same node twice (e.g.
264+ # station on lines with only one track). WARNING: this produced weird
265+ # results in the past. It should be okay to do it after the check above.
253266 connected_edges = [
254267 e
255268 for e in self .topology .edges .values ()
0 commit comments