-
Notifications
You must be signed in to change notification settings - Fork 67
ENH: Added Cellular Automata-based Enclosed Tessellation #686
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
551871d
5b4a06e
0b3a519
d754ce4
37cd109
3cab43b
5f77453
15f9e44
7da1bc8
89c7c55
bc4d83c
552cead
9879473
9e85279
df21245
c9a79e3
374e226
63f11a3
7c588b0
7d534c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ | |
GPD_GE_013 = Version(gpd.__version__) >= Version("0.13.0") | ||
GPD_GE_10 = Version(gpd.__version__) >= Version("1.0dev") | ||
LPS_GE_411 = Version(libpysal.__version__) >= Version("4.11.dev") | ||
SPL_GE_210 = Version(shapely.__version__) >= Version("2.1.0rc1") | ||
|
||
__all__ = [ | ||
"morphological_tessellation", | ||
|
@@ -137,6 +138,7 @@ | |
cell_size: float = 1.0, | ||
neighbor_mode: str = "moore", | ||
barriers_for_inner: GeoSeries | GeoDataFrame = None, | ||
cell_tolerance: float = 2.0 | ||
) -> GeoDataFrame: | ||
"""Generate enclosed tessellation | ||
|
||
|
@@ -283,6 +285,7 @@ | |
cell_size, | ||
neighbor_mode, | ||
barriers_for_inner, | ||
cell_tolerance | ||
) | ||
for t in tuples | ||
) | ||
|
@@ -328,6 +331,7 @@ | |
cell_size, | ||
neighbor_mode, | ||
barriers_for_inner, | ||
cell_tolerance | ||
): | ||
"""Generate tessellation for a single enclosure. Helper for enclosed_tessellation""" | ||
# check if threshold is set and filter buildings based on the threshold | ||
|
@@ -338,7 +342,7 @@ | |
> (shapely.area(blg.geometry.array) * threshold) | ||
] | ||
else: | ||
blg = blg[ | ||
shapely.area( | ||
shapely.intersection( | ||
blg.geometry.array, shapely.geometry.Polygon(poly.boundary) | ||
|
@@ -358,12 +362,13 @@ | |
as_gdf=True, | ||
) | ||
else: | ||
tess = _voronoi_by_ca( | ||
seed_geoms=blg, | ||
barrier_geoms=poly, | ||
cell_size=cell_size, | ||
neighbor_mode=neighbor_mode, | ||
barriers_for_inner=barriers_for_inner, | ||
cell_tolerance=cell_tolerance * cell_size, | ||
) | ||
tess[enclosure_id] = ix | ||
return tess | ||
|
@@ -388,6 +393,7 @@ | |
cell_size: float = 1.0, | ||
neighbor_mode: str = "moore", | ||
barriers_for_inner: GeoSeries | GeoDataFrame = None, | ||
cell_tolerance: float = 2.0, | ||
martinfleis marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
) -> GeoDataFrame: | ||
""" | ||
Generate an aggregated Voronoi tessellation as a GeoDataFrame via a cellular automata. | ||
|
@@ -409,106 +415,131 @@ | |
barrier_geoms: GeoDataFrame containing barrier features or a shapely Polygon. | ||
cell_size: Grid cell size. By default it is 1.0. | ||
neighbor_mode: Choice of neighbor connectivity ('moore' or 'neumann'). By default it is 'moore'. | ||
barriers_for_inner: GeoDataFrame containing inner barriers to be included. By default it is None. | ||
barriers_for_inner: GeoDataFrame containing inner barriers to be included. By default it is None | ||
cell_tolerance: Tolerance for simplifying the grid cells. By default it is 2.0. | ||
|
||
|
||
Returns: | ||
A GeoDataFrame representing the aggregated Voronoi tessellation, clipped by barriers. | ||
""" | ||
|
||
# If there is barriers_for_inner, add the intersected or contained barriers to the barrier_geoms | ||
if barriers_for_inner is not None: | ||
# get inner barriers | ||
inner_barriers = _get_inner_barriers( | ||
barrier_geoms, barriers_for_inner.to_crs(seed_geoms.crs) | ||
) | ||
|
||
if barrier_geoms.geom_type == "Polygon": | ||
# Wrap a single barrier geometry (Polygon, LineString, or GeometryCollection) | ||
barrier_geoms = GeoSeries([barrier_geoms.boundary], crs=seed_geoms.crs) | ||
# Handle barrier_geoms if it is a Polygon or MultiPolygon | ||
if barrier_geoms.geom_type == "Polygon": | ||
# Take buffer of polygon and extract its exterior boundary | ||
barrier_geoms_buffered = GeoSeries( | ||
[barrier_geoms.buffer(10).exterior], crs=seed_geoms.crs | ||
|
||
) | ||
barrier_geoms = GeoSeries([barrier_geoms], crs=seed_geoms.crs) | ||
|
||
elif barrier_geoms.geom_type == "MultiPolygon": | ||
# Process each polygon: take buffer then exterior boundary (to ensure there's no gap between enclosures) | ||
barrier_geoms_buffered = GeoSeries( | ||
[poly.buffer(10).exterior for poly in barrier_geoms.geoms], | ||
crs=seed_geoms.crs, | ||
) | ||
yu-ta-sato marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
barrier_geoms = GeoSeries(barrier_geoms, crs=seed_geoms.crs) | ||
|
||
if len(inner_barriers) > 0: | ||
# add inner barriers as a list of geometry to the barrier_geoms | ||
barrier_geoms = pd.concat( | ||
[barrier_geoms, inner_barriers], ignore_index=True | ||
) | ||
else: | ||
if barrier_geoms.geom_type == "Polygon": | ||
# Wrap a single barrier geometry (Polygon, LineString, or GeometryCollection) | ||
barrier_geoms = GeoSeries([barrier_geoms.boundary], crs=seed_geoms.crs) | ||
raise ValueError("barrier_geoms must be a Polygon or MultiPolygon") | ||
|
||
outer_union = shapely.ops.unary_union(barrier_geoms_buffered) | ||
yu-ta-sato marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
# Compute inner barriers union if available | ||
if ( | ||
"inner_barriers" in locals() | ||
and inner_barriers is not None | ||
and not inner_barriers.empty | ||
): | ||
inner_union = shapely.ops.unary_union(inner_barriers.geometry) | ||
yu-ta-sato marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
else: | ||
inner_union = None | ||
|
||
# Combine outer barrier with inner barriers | ||
if outer_union and inner_union: | ||
prep_barrier = shapely.ops.unary_union([outer_union, inner_union]) | ||
yu-ta-sato marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
elif outer_union: | ||
prep_barrier = outer_union | ||
elif inner_union: | ||
prep_barrier = inner_union | ||
else: | ||
prep_barrier = None | ||
|
||
# Compute grid bounds | ||
origin, grid_width, grid_height = _get_grid_bounds( | ||
seed_geoms, barrier_geoms, cell_size | ||
seed_geoms, barrier_geoms_buffered, cell_size | ||
) | ||
|
||
# Prepare barrier geometries if provided. | ||
barrier_union = None | ||
prep_barrier = None | ||
if not barrier_geoms.empty: | ||
barrier_union = shapely.ops.unary_union(barrier_geoms.geometry) | ||
prep_barrier = shapely.prepared.prep(barrier_union) | ||
|
||
# Initialize grid states with UNKNOWN values. | ||
states = np.full((grid_height, grid_width), CellState.UNKNOWN.value, dtype=int) | ||
|
||
|
||
xs, ys = np.meshgrid(np.arange(grid_width), np.arange(grid_height)) | ||
cell_polys = GeoSeries( | ||
[ | ||
_get_cell_polygon(x, y, cell_size, origin) | ||
for x, y in zip(xs.flatten(), ys.flatten()) | ||
] | ||
) | ||
|
||
# Identify barrier cells in the grid | ||
if prep_barrier is not None: | ||
xs, ys = np.meshgrid(np.arange(grid_width), np.arange(grid_height)) | ||
cell_polys = GeoSeries( | ||
[ | ||
_get_cell_polygon(x, y, cell_size, origin) | ||
for x, y in zip(xs.flatten(), ys.flatten()) | ||
] | ||
barrier_mask = cell_polys.intersects(prep_barrier).values.reshape( | ||
grid_height, grid_width | ||
) | ||
barrier_mask = cell_polys.intersects(barrier_union).values.reshape( | ||
(grid_height, grid_width) | ||
) | ||
states[barrier_mask] = CellState.BARRIER.value | ||
else: | ||
barrier_mask = np.zeros((grid_height, grid_width), dtype=bool) | ||
states[barrier_mask] = CellState.BARRIER.value | ||
|
||
# Seed the grid with seed geometries. | ||
for site_id, geom in enumerate(seed_geoms.geometry): | ||
if not geom.is_empty: | ||
cells = _geom_to_cells(geom, origin, cell_size, grid_width, grid_height) | ||
valid_cells = [ | ||
(x, y) for x, y in cells if states[y, x] == CellState.UNKNOWN.value | ||
] | ||
if valid_cells: | ||
indices = np.array(valid_cells) | ||
states[indices[:, 1], indices[:, 0]] = site_id | ||
|
||
# Initialize the BFS queue with all seeded cells’ neighbors. | ||
queue = deque() | ||
seed_indices = np.argwhere(states >= 0) | ||
for y, x in seed_indices: | ||
_enqueue_neighbors(x, y, states, grid_width, grid_height, neighbor_mode, queue) | ||
|
||
# Process BFS to propagate seed values. | ||
while queue: | ||
# Dequeue the current cell and skip if it is not a frontier cell. | ||
x_current, y_current = queue.popleft() | ||
if states[y_current, x_current] != CellState.FRONTIER.value: | ||
continue | ||
|
||
# Get neighbor cells that were already assigned a seed id or still unknown (state >= 0). | ||
# Note that boundary or barrier cells are skipped (state < 0). | ||
neighbor_seeds = [ | ||
states[ny, nx] | ||
for nx, ny in _get_neighbors( | ||
x_current, y_current, grid_width, grid_height, mode=neighbor_mode | ||
) | ||
if states[ny, nx] >= 0 | ||
] | ||
if not neighbor_seeds: | ||
continue | ||
|
||
# Assign as a boundary if multiple seed ids are found. | ||
if len(set(neighbor_seeds)) > 1: | ||
states[y_current, x_current] = CellState.BOUNDARY.value | ||
# EIf not, equeue neighbor cells for further propagation. | ||
else: | ||
assigned_seed = set(neighbor_seeds).pop() | ||
states[y_current, x_current] = assigned_seed | ||
_enqueue_neighbors( | ||
x_current, | ||
y_current, | ||
states, | ||
|
@@ -519,20 +550,20 @@ | |
) | ||
|
||
# Post-process barrier and boundary cells using a voting mechanism. | ||
states = _assign_adjacent_seed_cells(states, neighbor_mode) | ||
|
||
# Create grid cell polygons and build a GeoDataFrame. | ||
xs, ys = np.meshgrid(np.arange(grid_width), np.arange(grid_height)) | ||
grid_polys = [ | ||
_get_cell_polygon(x, y, cell_size, origin) | ||
for x, y in zip(xs.flatten(), ys.flatten()) | ||
] | ||
grid_gdf = GeoDataFrame( | ||
{"site_id": states.flatten()}, geometry=grid_polys, crs=seed_geoms.crs | ||
) | ||
|
||
# Include only cells with valid seed assignments and dissolve contiguous regions. | ||
grid_gdf = ( | ||
grid_gdf[grid_gdf["site_id"] >= 0] | ||
.dissolve(by="site_id") | ||
.reset_index() | ||
|
@@ -540,20 +571,31 @@ | |
) | ||
|
||
# Clip by barriers | ||
if barrier_geoms is not None and (not barrier_geoms.empty): | ||
# Create a union of the barrier geometries. | ||
barrier_union = shapely.ops.unary_union(barrier_geoms.geometry) | ||
yu-ta-sato marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
# If the barrier union is not a polygon (e.g., it's a MultiLineString), polygonize it. | ||
if not isinstance( | ||
barrier_union, (shapely.geometry.Polygon, shapely.geometry.MultiPolygon) | ||
): | ||
barrier_polys = list(shapely.ops.polygonize(barrier_union)) | ||
if barrier_polys: | ||
barrier_union = shapely.ops.unary_union(barrier_polys) | ||
# Clip each polygon in the grid using the barrier boundary. | ||
grid_gdf["geometry"] = grid_gdf["geometry"].intersection(barrier_union) | ||
|
||
if cell_tolerance is not None: | ||
if SPL_GE_210: | ||
# Simplify coverages with the parameter cell_tolerance as the area threshold of Visvalingam-Whyatt algorithm. | ||
grid_gdf["geometry"] = shapely.coverage_simplify( | ||
grid_gdf["geometry"].array, tolerance=cell_tolerance | ||
) | ||
else: | ||
warnings.warn( | ||
"Shapely 2.1.0 or higher is required for coverage_simplify. Skipping." | ||
) | ||
martinfleis marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
return grid_gdf | ||
|
||
|
||
def _get_inner_barriers(enclosure, barriers): | ||
|
@@ -569,14 +611,17 @@ | |
and any intersecting barriers. | ||
""" | ||
# Find barriers intersecting or contained in the enclosure | ||
inner_barriers = barriers[ | ||
barriers.intersects(enclosure) | barriers.contains(enclosure) | ||
] | ||
|
||
# Clip those segments to stay within the enclosure | ||
inner_barriers = gpd.clip(inner_barriers, enclosure) | ||
|
||
# Only keep the geometry which is within the enclosure | ||
inner_barriers = inner_barriers[inner_barriers.within(enclosure)] | ||
|
||
return GeoSeries(inner_barriers.geometry, crs=barriers.crs) | ||
|
||
|
||
|
||
class CellState(Enum): | ||
|
@@ -602,8 +647,8 @@ | |
""" | ||
Generate a grid cell polygon based on the given indices, cell size, and origin. | ||
""" | ||
ox, oy = origin | ||
return shapely.geometry.Polygon( | ||
yu-ta-sato marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
[ | ||
(ox + x_idx * cell_size, oy + y_idx * cell_size), | ||
(ox + (x_idx + 1) * cell_size, oy + y_idx * cell_size), | ||
|
@@ -627,14 +672,14 @@ | |
Returns: | ||
A list of (x, y) tuples for valid neighbor indices. | ||
""" | ||
neighbor_dirs = { | ||
"moore": [(-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1)], | ||
"neumann": [(0, -1), (-1, 0), (1, 0), (0, 1)], | ||
} | ||
directions = neighbor_dirs.get(mode) | ||
if directions is None: | ||
raise ValueError("Invalid neighbor_mode: choose 'moore' or 'neumann'") | ||
return [ | ||
(x + dx, y + dy) | ||
for dx, dy in directions | ||
if 0 <= x + dx < max_x and 0 <= y + dy < max_y | ||
|
@@ -646,19 +691,21 @@ | |
barrier_geoms: GeoSeries | GeoDataFrame, | ||
cell_size: float, | ||
) -> Tuple[Tuple[float, float], int, int]: | ||
""" | ||
Compute the grid bounds required to cover both seed and barrier geometries. | ||
""" | ||
seed_bounds = seed_geoms.total_bounds # [xmin, ymin, xmax, ymax] | ||
barrier_bounds = barrier_geoms.total_bounds | ||
|
||
xmin = min(seed_bounds[0], barrier_bounds[0]) | ||
ymin = min(seed_bounds[1], barrier_bounds[1]) | ||
xmax = max(seed_bounds[2], barrier_bounds[2]) | ||
ymax = max(seed_bounds[3], barrier_bounds[3]) | ||
grid_width = math.ceil((xmax - xmin) / cell_size) | ||
grid_height = math.ceil((ymax - ymin) / cell_size) | ||
return (xmin, ymin), grid_width, grid_height | ||
# expand bounds by 1 cell in each direction | ||
new_xmin = xmin - cell_size | ||
new_ymin = ymin - cell_size | ||
new_xmax = xmax + cell_size | ||
new_ymax = ymax + cell_size | ||
grid_width = math.ceil((new_xmax - new_xmin) / cell_size) | ||
grid_height = math.ceil((new_ymax - new_ymin) / cell_size) | ||
return (new_xmin, new_ymin), grid_width, grid_height | ||
|
||
|
||
def _geom_to_cells( | ||
|
@@ -671,29 +718,29 @@ | |
""" | ||
Determine grid cell indices that intersect the given geometry. | ||
""" | ||
if isinstance(geom, shapely.geometry.Point): | ||
sx = int((geom.x - origin[0]) // cell_size) | ||
sy = int((geom.y - origin[1]) // cell_size) | ||
return [(sx, sy)] if 0 <= sx < grid_width and 0 <= sy < grid_height else [] | ||
|
||
else: | ||
minx, miny, maxx, maxy = geom.bounds | ||
start_x = max(0, int((minx - origin[0]) // cell_size)) | ||
start_y = max(0, int((miny - origin[1]) // cell_size)) | ||
end_x = min(grid_width, int(math.ceil((maxx - origin[0]) / cell_size))) | ||
end_y = min(grid_height, int(math.ceil((maxy - origin[1]) / cell_size))) | ||
|
||
x_range = np.arange(start_x, end_x) | ||
y_range = np.arange(start_y, end_y) | ||
xx, yy = np.meshgrid(x_range, y_range) | ||
candidate_polys = GeoSeries( | ||
[ | ||
_get_cell_polygon(x, y, cell_size, origin) | ||
for x, y in zip(xx.flatten(), yy.flatten()) | ||
] | ||
) | ||
mask = candidate_polys.intersects(geom) | ||
return list(zip(xx.flatten()[mask], yy.flatten()[mask])) | ||
|
||
|
||
def _enqueue_neighbors( | ||
|
@@ -708,10 +755,10 @@ | |
""" | ||
Enqueue valid neighboring cells for BFS expansion. | ||
""" | ||
for nx, ny in _get_neighbors(x, y, grid_width, grid_height, mode=neighbor_mode): | ||
if states[ny, nx] == CellState.UNKNOWN.value: | ||
states[ny, nx] = CellState.FRONTIER.value | ||
queue.append((nx, ny)) | ||
|
||
|
||
def _assign_adjacent_seed_cells( | ||
|
@@ -720,26 +767,26 @@ | |
""" | ||
Reassign border and barrier cells to the proximate seed areas using a voting mechanism. | ||
""" | ||
new_states = states.copy() | ||
indices = np.argwhere( | ||
np.isin(states, [CellState.BARRIER.value, CellState.BOUNDARY.value]) | ||
) | ||
grid_height, grid_width = states.shape | ||
|
||
for y, x in indices: | ||
neighbor_seeds = [ | ||
states[ny, nx] | ||
for nx, ny in _get_neighbors( | ||
x, y, grid_width, grid_height, mode=neighbor_mode | ||
) | ||
if states[ny, nx] >= 0 | ||
] | ||
if neighbor_seeds: | ||
cnt = Counter(neighbor_seeds) | ||
# In case of ties, choose the smaller seed id. | ||
chosen_seed = min(cnt.items(), key=lambda item: (-item[1], item[0]))[0] | ||
new_states[y, x] = chosen_seed | ||
return new_states | ||
|
||
|
||
def verify_tessellation(tessellation, geometry): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the rationale behind this? Why are you ignoring holes when a MultiPolygon is provided? Feels inconsistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The initial intention was to let the poly be accommodated with inner barriers, but now they're separated. Somehow this part remained, so will remove it.