Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Scripts/assignment/assignment_period.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pandas

from typing import TYPE_CHECKING, Any, Dict, Optional, Union
from events.model_system_event_listener import EventHandler
from events.event_handler import EventHandler
import utils.log as log
import parameters.assignment as param
import parameters.zone as zone_param
Expand Down
2 changes: 1 addition & 1 deletion Scripts/assignment/emme_assignment.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import parameters.zone as zone_param
from assignment.abstract_assignment import AssignmentModel
from assignment.assignment_period import AssignmentPeriod
from events.model_system_event_listener import EventHandler
from events.event_handler import EventHandler
if TYPE_CHECKING:
from assignment.emme_bindings.emme_project import EmmeProject
from assignment.datatypes.transit_fare import TransitFareZoneSpecification
Expand Down
14 changes: 13 additions & 1 deletion Scripts/datahandling/zonedata.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,25 @@ def get_data(self, key: str, bounds: slice, generation: bool=False) -> Union[pan
else: # Return matrix (purpose zones -> all zones)
return val[bounds, :]

def get_zone_data(self) -> pandas.DataFrame:
"""Get all zone data as Pandas DataFrame

Returns
-------
pandas DataFrame
All zone data
"""
df = pandas.DataFrame({k:v for k,v in self._values.items() if isinstance(v, pandas.Series)})
df.index.name = "zone_id"
return df

def export_data(self, export_file: Path):
"""Export Pandas Series zone data into a single CSV file

Args:
export_file (Path): Path to the destination file
"""
df = pandas.DataFrame({k:v for k,v in self._values.items() if isinstance(v, pandas.Series)})
df = self.get_zone_data()
df.to_csv(export_file)


Expand Down
68 changes: 68 additions & 0 deletions Scripts/events/event_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from sys import gettrace
import importlib.util
from utils import log
from pathlib import Path
from events.model_system_event_listener import ModelSystemEventListener


class EventHandler(ModelSystemEventListener):
"""Event handler that calls all equivalent methods in all other ModelSystemEventListener classes."""
def __init__(self):
"""Initialize the EventHandler.

Args:
model_system (ModelSystem): ModelSystem instance.
"""
super().__init__()
self.listeners = []
self._create_methods()

def register_listener(self, listener: ModelSystemEventListener):
self.listeners.append(listener)

def load_listeners(self, listener_path: Path):
"""Load all listeners from a given path.

Args:
listener_path (str): The path to the listeners.
"""
for file_path in listener_path.glob("*.py"):
if file_path.name != "__init__.py":
try:
module_name = file_path.stem
spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
for attr_name in dir(module):
attr = getattr(module, attr_name)
if isinstance(attr, type) and issubclass(attr, ModelSystemEventListener) and attr is not ModelSystemEventListener:
self.register_listener(attr())
log.info(f"Loaded listener {attr.__name__} from {file_path}")
except Exception as e:
log.error(f"Error loading listener from {file_path}: {e}")


def _create_methods(self):
"""Create methods that call all equivalent methods in all other ModelSystemEventListener classes.
Methods area automatically created for all methods that start with "on_" in all ModelSystemEventListener classes.
"""
for method_name in dir(ModelSystemEventListener):
if method_name.startswith("on_") and callable(getattr(ModelSystemEventListener, method_name)):
setattr(self, method_name, self._create_method(method_name))

def _create_method(self, method_name):
"""Create a method that calls all equivalent methods in all other ModelSystemEventListener classes.

Args:
method_name (str): name of the method to create.
"""
def method(*args, **kwargs):
for listener in self.listeners:
try:
getattr(listener, method_name)(*args, **kwargs)
except Exception as e:
if gettrace() is not None:
# Re-raise exception if debugger is attached
raise e
log.error(f"Error in {listener.__class__.__name__}.{method_name}: {e}")
return method
78 changes: 15 additions & 63 deletions Scripts/events/model_system_event_listener.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
from abc import ABC
from typing import TYPE_CHECKING, Dict, Union
from sys import gettrace
from utils import log
from pathlib import Path
import importlib.util

if TYPE_CHECKING:
import pandas as pd
Expand All @@ -20,7 +17,7 @@
from demand.trips import DemandModel
from argparse import Namespace
from utils.validation import Validation
from assignment.emme_assignment import EmmeAssignmentModel
from assignment.emme_assignment import EmmeAssignmentModel, AssignmentModel

class ModelSystemEventListener(ABC):

Expand Down Expand Up @@ -62,12 +59,25 @@ def on_zone_data_loaded(self, base_data: 'ZoneData', forecast_data: 'ZoneData')
"""
pass

def on_model_system_initialized(self, model_system: 'ModelSystem') -> None:
def on_model_system_initialized(self,
model_system: 'ModelSystem',
zone_data_path: str,
base_zone_data_path: str,
base_matrices_path: str,
results_path: str,
assignment_model: 'AssignmentModel',
name: str) -> None:
"""
Event handler that is called when the model system is initialized.

Args:
model_system (ModelSystem): The model system.
zone_data_path (str): The path to the zone data.
base_zone_data_path (str): The path to the base zone data.
base_matrices_path (str): The path to the base matrices.
results_path (str): The path to the results.
assignment_model (AssignmentModel): The assignment model.
name (str): The name of the model system.
"""
pass

Expand Down Expand Up @@ -295,61 +305,3 @@ def on_daily_results_aggregated(self, assignment_model: 'EmmeAssignmentModel', d
pass


class EventHandler(ModelSystemEventListener):
"""Event handler that calls all equivalent methods in all other ModelSystemEventListener classes."""
def __init__(self):
"""Initialize the EventHandler.

Args:
model_system (ModelSystem): ModelSystem instance.
"""
super().__init__()
self.listeners = []
self._create_methods()

def register_listener(self, listener: ModelSystemEventListener):
self.listeners.append(listener)

def load_listeners(self, listener_path: Path):
"""Load all listeners from a given path.

Args:
listener_path (str): The path to the listeners.
"""
for file_path in listener_path.glob("*.py"):
if file_path.name != "__init__.py":
module_name = file_path.stem
spec = importlib.util.spec_from_file_location(module_name, file_path)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
for attr_name in dir(module):
attr = getattr(module, attr_name)
if isinstance(attr, type) and issubclass(attr, ModelSystemEventListener) and attr is not ModelSystemEventListener:
self.register_listener(attr())
log.info(f"Loaded listener {attr.__name__} from {file_path}")


def _create_methods(self):
"""Create methods that call all equivalent methods in all other ModelSystemEventListener classes.
Methods area automatically created for all methods that start with "on_" in all ModelSystemEventListener classes.
"""
for method_name in dir(ModelSystemEventListener):
if method_name.startswith("on_") and callable(getattr(ModelSystemEventListener, method_name)):
setattr(self, method_name, self._create_method(method_name))

def _create_method(self, method_name):
"""Create a method that calls all equivalent methods in all other ModelSystemEventListener classes.

Args:
method_name (str): name of the method to create.
"""
def method(*args, **kwargs):
for listener in self.listeners:
try:
getattr(listener, method_name)(*args, **kwargs)
except Exception as e:
if gettrace() is not None:
# Re-raise exception if debugger is attached
raise e
log.error(f"Error in {listener.__class__.__name__}.{method_name}: {e}")
return method
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,14 @@ def __init__(self):
super().__init__()
self.mode_demands = []

def on_model_system_initialized(self, model_system: 'ModelSystem'):
def on_model_system_initialized(self,
model_system: 'ModelSystem',
zone_data_path: str,
base_zone_data_path: str,
base_matrices_path: str,
results_path: str,
assignment_model: 'AssignmentModel',
name: str) -> None:
# Get result path when model system is initialized
self.result_path = Path(model_system.resultdata.path) / 'mode_analysis_results.csv'

Expand Down
111 changes: 111 additions & 0 deletions Scripts/events/standard_modules/gpkg_results.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from pathlib import Path
from typing import TYPE_CHECKING, Union
import pandas as pd
import numpy as np
from utils import log

from events.model_system_event_listener import ModelSystemEventListener

if TYPE_CHECKING:
from modelsystem import ModelSystem
from datatypes.purpose import TourPurpose
from assignment.emme_assignment import AssignmentModel

class GpkgResult(ModelSystemEventListener):
"""
A class to print zone and network data to results directory.
"""
result_path: Path
""" The path to the result file. """

is_last_iteration: bool
""" Flag to indicate if this is the last iteration. """

model_system: 'ModelSystem'
""" The model system instance. """

zone_gpkg_path: Path
""" The path to the zone data. """

parking_time: pd.Series

def __init__(self):
super().__init__()
self.is_last_iteration = False


def on_model_system_initialized(self,
model_system: 'ModelSystem',
zone_data_path: str,
base_zone_data_path: str,
base_matrices_path: str,
results_path: str,
assignment_model: 'AssignmentModel',
name: str) -> None:
# Get result path when model system is initialized
self.model_system = model_system
self.result_path = Path(model_system.resultdata.path) / 'model_data.gpkg'
self.zone_gpkg_path = Path(zone_data_path) / 'zones.gpkg'
if not self.zone_gpkg_path.exists():
self.zone_gpkg_path = Path(base_zone_data_path) / 'zones.gpkg'


def on_iteration_started(self, iteration, previous_impedance):
if iteration == 'last' or iteration is None:
self.is_last_iteration = True

def on_parking_time_calculated(self, purpose: 'TourPurpose', parking_time: np.ndarray):
if not self.is_last_iteration:
return
# Create a DataFrame from parking_time using purpose.zone_data zone numbers as index
self.parking_itme_df = pd.DataFrame(parking_time, index=purpose.zone_data.zone_numbers, columns=['parking_time'])
self.parking_itme_df.index.name = 'zone_id'
# Save the DataFrame to a CSV file

def on_iteration_complete(self, iteration: Union[int, str], impedance, gap):
if not self.is_last_iteration:
return

zone_data_df = self.model_system.zdata_forecast.get_zone_data()
try:
import geopandas as gpd
import logging
logging.getLogger("pyogrio").setLevel(logging.ERROR)

except ImportError:
raise ImportError("geopandas is not installed. Please install it to use this feature to export GPKG data.")
if not self.zone_gpkg_path.exists():
log.warn(f"Zone data file not found in {self.zone_gpkg_path}. Zone data export disabled.")
return
zone_gdf = gpd.read_file(self.zone_gpkg_path, layer='polygons').set_index('zone_id')
# Join zone_gdf and zone_data_df using the index
zone_gdf = zone_gdf.join(zone_data_df, how='left')
# Join parking_time into zone_gdf
zone_gdf = zone_gdf.join(self.parking_itme_df, how='left')
# Write the updated GeoDataFrame to the result path
zone_gdf.to_file(self.result_path, layer='zone_data', driver='GPKG')

def on_daily_results_aggregated(self, assignment_model, day_network):
if not self.is_last_iteration:
return
try:
import geopandas as gpd
import logging
logging.getLogger("pyogrio").setLevel(logging.ERROR)
except ImportError:
raise ImportError("geopandas is not installed. Please install it to use this feature to export GPKG data.")
from utils.geodata_helpers import (
get_links,
get_nodes,
get_transit_lines,
get_transit_segments
)

scenario = assignment_model.day_scenario

get_links(day_network, scenario).to_file(self.result_path, layer='links', driver='GPKG')
get_nodes(day_network, scenario).to_file(self.result_path, layer='nodes', driver='GPKG')
# Get the network and scenario from the assignment model
gpd.GeoDataFrame(get_transit_lines(day_network, scenario), geometry=None).to_file(self.result_path, layer='transit_lines', driver='GPKG')
gpd.GeoDataFrame(get_transit_segments(day_network, scenario), geometry=None).to_file(self.result_path, layer='transit_segments', driver='GPKG')
# Get links and nodes from the network and scenario
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
if TYPE_CHECKING:
from modelsystem import ModelSystem
from datatypes.purpose import TourPurpose
from assignment.emme_assignment import AssignmentModel


class ParkingTimeResult(ModelSystemEventListener):
Expand All @@ -20,9 +21,16 @@ class ParkingTimeResult(ModelSystemEventListener):
def __init__(self):
super().__init__()

def on_model_system_initialized(self, model_system: 'ModelSystem'):
def on_model_system_initialized(self,
model_system: 'ModelSystem',
zone_data_path: str,
base_zone_data_path: str,
base_matrices_path: str,
results_path: str,
assignment_model: 'AssignmentModel',
name: str) -> None:
# Get result path when model system is initialized
self.result_path = Path(model_system.resultdata.path) / 'parking_time.csv'
self.result_path = Path(results_path) / name / 'parking_time.csv'

def on_parking_time_calculated(self, purpose: 'TourPurpose', parking_time: np.ndarray):
# Create a DataFrame from parking_time using purpose.zone_data zone numbers as index
Expand Down
Loading