Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 94 additions & 2 deletions notebooks/user_guide.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -4173,6 +4173,98 @@
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### RSET Maps\n",
"\n",
"RSET (Required Safe Egress Time) maps show the time at which pedestrians occupy each spatial cell.\n",
"The walkable area is divided into a grid and for each cell an aggregation function (e.g. maximum) is applied to the time values of all trajectory points inside that cell.\n",
"\n",
"The method follows Schröder et al., *A Map Representation of the ASET-RSET Concept* (Fire Safety Journal, 2020).\n",
"\n",
"Three aggregation methods are available:\n",
"\n",
"- **MAX**: last time any pedestrian was observed in each cell (the actual RSET)\n",
"- **MIN**: earliest time any pedestrian was observed in each cell\n",
"- **MEAN**: average time pedestrians were observed in each cell\n",
"\n",
":::{note}\n",
"Cells with no trajectory data contain *NaN*. Use `np.nanmax`, `np.nanmin`, etc. instead of `np.max`/`np.min` when computing summary statistics from the returned array\n",
":::"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pedpy import RsetMethod, compute_rset_map\n",
"\n",
"grid_size_rset = 0.25\n",
"\n",
"rset_max = compute_rset_map(\n",
" traj_data=traj,\n",
" walkable_area=walkable_area,\n",
" grid_size=grid_size_rset,\n",
" method=RsetMethod.MAX,\n",
")\n",
"\n",
"rset_min = compute_rset_map(\n",
" traj_data=traj,\n",
" walkable_area=walkable_area,\n",
" grid_size=grid_size_rset,\n",
" method=RsetMethod.MIN,\n",
")\n",
"\n",
"rset_mean = compute_rset_map(\n",
" traj_data=traj,\n",
" walkable_area=walkable_area,\n",
" grid_size=grid_size_rset,\n",
" method=RsetMethod.MEAN,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"import numpy as np\n",
"\n",
"from pedpy import plot_rset_map\n",
"\n",
"fig, (ax0, ax1, ax2) = plt.subplots(nrows=1, ncols=3, layout=\"constrained\")\n",
"fig.set_size_inches(15, 5)\n",
"plot_rset_map(\n",
" walkable_area=walkable_area,\n",
" rset_map=rset_max,\n",
" axes=ax0,\n",
" title=f\"RSET (max) = {np.nanmax(rset_max):.1f} s\",\n",
")\n",
"plot_rset_map(\n",
" walkable_area=walkable_area,\n",
" rset_map=rset_min,\n",
" axes=ax1,\n",
" title=f\"RSET (min) = {np.nanmin(rset_min):.1f} s\",\n",
")\n",
"plot_rset_map(\n",
" walkable_area=walkable_area,\n",
" rset_map=rset_mean,\n",
" axes=ax2,\n",
" title=f\"RSET (mean) = {np.nanmean(rset_mean):.1f} s\",\n",
")\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand Down Expand Up @@ -5034,7 +5126,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"display_name": ".venv (3.13.7)",
"language": "python",
"name": "python3"
},
Expand All @@ -5048,7 +5140,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.4"
"version": "3.13.7"
}
},
"nbformat": 4,
Expand Down
6 changes: 6 additions & 0 deletions pedpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,12 @@
)
from .methods.profile_calculator import (
DensityMethod,
RsetMethod,
SpeedMethod,
compute_density_profile,
compute_grid_cell_polygon_intersection_area,
compute_profiles,
compute_rset_map,
compute_speed_profile,
get_grid_cells,
)
Expand Down Expand Up @@ -141,6 +143,7 @@
plot_neighborhood,
plot_nt,
plot_profiles,
plot_rset_map,
plot_speed,
plot_speed_at_line,
plot_speed_distribution,
Expand Down Expand Up @@ -193,10 +196,12 @@
"is_trajectory_valid",
"compute_pair_distribution_function",
"DensityMethod",
"RsetMethod",
"SpeedMethod",
"compute_density_profile",
"compute_grid_cell_polygon_intersection_area",
"compute_profiles",
"compute_rset_map",
"compute_speed_profile",
"get_grid_cells",
"SpeedCalculation",
Expand Down Expand Up @@ -225,6 +230,7 @@
"plot_neighborhood",
"plot_nt",
"plot_profiles",
"plot_rset_map",
"plot_speed",
"plot_speed_at_line",
"plot_speed_distribution",
Expand Down
119 changes: 118 additions & 1 deletion pedpy/methods/profile_calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
import numpy.typing as npt
import pandas as pd
import shapely
from scipy import stats

from pedpy.column_identifier import FRAME_COL
from pedpy.column_identifier import FRAME_COL, X_COL, Y_COL
from pedpy.data.geometry import (
AxisAlignedMeasurementArea,
MeasurementArea,
WalkableArea,
)
from pedpy.data.trajectory_data import TrajectoryData
from pedpy.errors import PedPyRuntimeError, PedPyTypeError, PedPyValueError
from pedpy.internal.utils import alias

Expand Down Expand Up @@ -1048,3 +1050,118 @@ def get_grid_cells(
len(y_coords) - 1,
len(x_coords) - 1,
)


class RsetMethod(Enum):
r"""Aggregation method for computing RSET maps.

RSET (Required Safe Egress Time) maps show the time at which
pedestrians occupy each spatial cell. Different aggregation methods
provide different perspectives on the evacuation process.

See: Schröder et al., "A Map Representation of the ASET-RSET Concept"
(Fire Safety Journal, 2020).
"""

MAX = auto()
r"""Maximum time a pedestrian was observed in each cell.

This gives the last time any pedestrian occupied each cell,
representing the required safe egress time (RSET) per cell.
"""

MIN = auto()
r"""Minimum time a pedestrian was observed in each cell.

This gives the earliest time any pedestrian was observed in each
cell.
"""

MEAN = auto()
r"""Mean time pedestrians were observed in each cell."""


_RSET_METHOD_TO_STAT: dict[RsetMethod, str] = {
RsetMethod.MAX: "max",
RsetMethod.MIN: "min",
RsetMethod.MEAN: "mean",
}


def compute_rset_map(
*,
traj_data: TrajectoryData,
walkable_area: Optional[WalkableArea] = None,
axis_aligned_measurement_area: Optional[AxisAlignedMeasurementArea] = None,
grid_size: float,
method: RsetMethod = RsetMethod.MAX,
) -> npt.NDArray[np.float64]:
r"""Compute an RSET (Required Safe Egress Time) map.

The walkable area (or measurement area) is divided into a grid of
square cells. For each cell, the time values of all trajectory
points falling inside the cell are aggregated using *method*
(default: maximum). This yields a 2-D array where each entry
represents the aggregated time in that cell.

The implementation follows Section 5.5.1 of Schröder et al.,
"A Map Representation of the ASET-RSET Concept"
(Fire Safety Journal, 2020).

Args:
traj_data: Trajectory data to analyse.
walkable_area: Geometry over which to compute the map.
axis_aligned_measurement_area: Alternative rectangular area.
grid_size: Side length of each grid cell (in metres).
method: Aggregation method applied to the time values
per cell (default: :attr:`RsetMethod.MAX`).

Returns:
A 2-D NumPy array (rows x cols) with the aggregated time per
cell. Cells with no observations contain NaN.
"""
get_grid_cells(
walkable_area=walkable_area,
axis_aligned_measurement_area=axis_aligned_measurement_area,
grid_size=grid_size,
)

if walkable_area is not None:
min_x, min_y, max_x, max_y = walkable_area.bounds
if axis_aligned_measurement_area is not None:
min_x, min_y, max_x, max_y = axis_aligned_measurement_area.bounds

x_edges = np.arange(min_x, max_x + grid_size, grid_size)
y_edges = np.arange(min_y, max_y + grid_size, grid_size)

if not isinstance(traj_data, TrajectoryData):
raise PedPyTypeError(f"traj_data must be an instance of TrajectoryData, got {type(traj_data).__name__}.")

data = traj_data.data
time = data[FRAME_COL].to_numpy(dtype=np.float64) / traj_data.frame_rate

# Validate aggregation method to provide consistent, helpful errors.
if method is None:
raise PedPyTypeError("method must be an instance of RsetMethod, not None.")
if not isinstance(method, RsetMethod):
raise PedPyTypeError(f"method must be an instance of RsetMethod, got {type(method).__name__}.")
if method not in _RSET_METHOD_TO_STAT:
valid_methods = ", ".join(m.name for m in _RSET_METHOD_TO_STAT)
raise PedPyValueError(f"Unknown RsetMethod '{method}'. Valid values are: {valid_methods}.")
stat_func = _RSET_METHOD_TO_STAT[method]

result = stats.binned_statistic_2d(
x=data[X_COL].to_numpy(dtype=np.float64),
y=data[Y_COL].to_numpy(dtype=np.float64),
values=time,
statistic=stat_func,
bins=[x_edges, y_edges],
)

rset = result.statistic.T

# Flip vertically so that row 0 corresponds to the top (max y),
# consistent with the orientation used by get_grid_cells and imshow.
rset = np.flipud(rset)

return rset
91 changes: 91 additions & 0 deletions pedpy/plotting/plotting.py
Original file line number Diff line number Diff line change
Expand Up @@ -1350,6 +1350,97 @@ def plot_profiles(
return axes


def plot_rset_map(
*,
walkable_area: WalkableArea,
rset_map: NDArray[np.float64],
measurement_area: Optional[AxisAlignedMeasurementArea] = None,
axes: Optional[matplotlib.axes.Axes] = None,
**kwargs: Any,
) -> matplotlib.axes.Axes:
"""Plot an RSET (Required Safe Egress Time) map.

Args:
walkable_area(WalkableArea): walkable area of the plot
rset_map: 2-D array as returned by
:func:`~profile_calculator.compute_rset_map`
measurement_area (AxisAlignedMeasurementArea): measurement area
used when computing the RSET map (optional)
axes (matplotlib.axes.Axes): Axes to plot on, if None new will
be created
kwargs: Additional parameters to change the plot appearance

Keyword Args:
title (optional): title of the plot
cmap (optional): colormap (default ``"jet"``)
vmin (optional): minimum value for the colormap
vmax (optional): maximum value for the colormap
label (optional): colorbar label (default ``"time / s"``)
font_size (optional): font size for axis and colorbar labels
(default 14)
title_size (optional): font size for the title (default 16)
tick_size (optional): font size for tick labels (default 12)
walkable_color (optional): color of the walkable area border
hole_color (optional): background color of holes
hole_alpha (optional): alpha of background color for holes

Returns:
matplotlib.axes.Axes instance where the RSET map is plotted
"""
bounds = walkable_area.bounds

if measurement_area is not None:
bounds = measurement_area.bounds

title = kwargs.pop("title", "RSET map")
walkable_color = kwargs.pop("walkable_color", "w")
hole_color = kwargs.pop("hole_color", "w")
hole_alpha = kwargs.pop("hole_alpha", 1.0)
vmin = kwargs.pop("vmin", np.nanmin(rset_map))
vmax = kwargs.pop("vmax", np.nanmax(rset_map))
label = kwargs.pop("label", "time / s")

font_size = kwargs.pop("font_size", 14)
title_size = kwargs.pop("title_size", 16)
tick_size = kwargs.pop("tick_size", 12)

if axes is None:
axes = plt.gca()

axes.set_title(title, fontsize=title_size)
axes.set_xlabel("x / m", fontsize=font_size)
axes.set_ylabel("y / m", fontsize=font_size)
axes.tick_params(labelsize=tick_size)
imshow = axes.imshow(
rset_map,
extent=(bounds[0], bounds[2], bounds[1], bounds[3]),
cmap=kwargs.pop("cmap", "jet"),
vmin=vmin,
vmax=vmax,
**kwargs,
)
divider = make_axes_locatable(axes)
cax = divider.append_axes("right", size="5%", pad=0.05)
fig = plt.gcf()

cb = fig.colorbar(imshow, cax=cax, orientation="vertical", label=label)
cb.ax.tick_params(labelsize=tick_size)
cb.set_label(label, fontsize=font_size)

axes.plot(*walkable_area.polygon.exterior.xy, color=walkable_color)
plot_walkable_area(
walkable_area=walkable_area,
axes=axes,
hole_color=hole_color,
hole_alpha=hole_alpha,
title=title,
x_label=axes.get_xlabel(),
y_label=axes.get_ylabel(),
)

return axes


def plot_walkable_area(
*,
walkable_area: WalkableArea,
Expand Down
Loading