Skip to content

Commit 3974bf5

Browse files
authored
Merge pull request #991 from mapswipe/improve-ressource-usage
refactor: improve RAM usage and calculation time for StreetProject
2 parents 2a458b5 + d8a5833 commit 3974bf5

File tree

2 files changed

+106
-154
lines changed

2 files changed

+106
-154
lines changed

mapswipe_workers/mapswipe_workers/utils/process_mapillary.py

Lines changed: 75 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,11 @@
11
import os
2-
from concurrent.futures import ThreadPoolExecutor, as_completed
2+
from concurrent.futures import ProcessPoolExecutor
3+
from functools import partial
34

45
import mercantile
56
import pandas as pd
67
import requests
7-
from shapely import (
8-
LineString,
9-
MultiLineString,
10-
MultiPolygon,
11-
Point,
12-
Polygon,
13-
box,
14-
unary_union,
15-
)
8+
from shapely import MultiPolygon, Point, Polygon, box, unary_union
169
from shapely.geometry import shape
1710
from vt2geojson import tools as vt2geojson_tools
1811

@@ -44,7 +37,7 @@ def create_tiles(polygon, level):
4437
return tiles
4538

4639

47-
def download_and_process_tile(row, attempt_limit=3):
40+
def download_and_process_tile(row, polygon, kwargs, attempt_limit=3):
4841
z = row["z"]
4942
x = row["x"]
5043
y = row["y"]
@@ -53,37 +46,24 @@ def download_and_process_tile(row, attempt_limit=3):
5346
attempt = 0
5447
while attempt < attempt_limit:
5548
try:
56-
r = requests.get(url)
57-
assert r.status_code == 200, r.content
58-
features = vt2geojson_tools.vt_bytes_to_geojson(r.content, x, y, z).get(
59-
"features", []
60-
)
61-
data = []
62-
for feature in features:
63-
geometry = feature.get("geometry", {})
64-
properties = feature.get("properties", {})
65-
geometry_type = geometry.get("type", None)
66-
coordinates = geometry.get("coordinates", [])
67-
68-
element_geometry = None
69-
if geometry_type == "Point":
70-
element_geometry = Point(coordinates)
71-
elif geometry_type == "LineString":
72-
element_geometry = LineString(coordinates)
73-
elif geometry_type == "MultiLineString":
74-
element_geometry = MultiLineString(coordinates)
75-
elif geometry_type == "Polygon":
76-
element_geometry = Polygon(coordinates[0])
77-
elif geometry_type == "MultiPolygon":
78-
element_geometry = MultiPolygon(coordinates)
79-
80-
# Append the dictionary with geometry and properties
81-
row = {"geometry": element_geometry, **properties}
82-
data.append(row)
83-
84-
data = pd.DataFrame(data)
85-
86-
if not data.empty:
49+
data = get_mapillary_data(url, x, y, z)
50+
if data.isna().all().all() is False or data.empty is False:
51+
data = data[data["geometry"].apply(lambda point: point.within(polygon))]
52+
target_columns = [
53+
"id",
54+
"geometry",
55+
"captured_at",
56+
"is_pano",
57+
"compass_angle",
58+
"sequence",
59+
"organization_id",
60+
]
61+
for col in target_columns:
62+
if col not in data.columns:
63+
data[col] = None
64+
if data.isna().all().all() is False or data.empty is False:
65+
data = filter_results(data, **kwargs)
66+
8767
return data
8868
except Exception as e:
8969
print(f"An exception occurred while requesting a tile: {e}")
@@ -93,8 +73,28 @@ def download_and_process_tile(row, attempt_limit=3):
9373
return None
9474

9575

76+
def get_mapillary_data(url, x, y, z):
77+
r = requests.get(url)
78+
assert r.status_code == 200, r.content
79+
features = vt2geojson_tools.vt_bytes_to_geojson(r.content, x, y, z).get(
80+
"features", []
81+
)
82+
data = []
83+
data.extend(
84+
[
85+
{
86+
"geometry": Point(feature["geometry"]["coordinates"]),
87+
**feature.get("properties", {}),
88+
}
89+
for feature in features
90+
if feature.get("geometry", {}).get("type") == "Point"
91+
]
92+
)
93+
return pd.DataFrame(data)
94+
95+
9696
def coordinate_download(
97-
polygon, level, use_concurrency=True, attempt_limit=3, workers=os.cpu_count() * 4
97+
polygon, level, kwargs: dict, use_concurrency=True, workers=os.cpu_count() * 4
9898
):
9999
tiles = create_tiles(polygon, level)
100100

@@ -104,48 +104,32 @@ def coordinate_download(
104104
if not use_concurrency:
105105
workers = 1
106106

107-
futures = []
108-
with ThreadPoolExecutor(max_workers=workers) as executor:
109-
for index, row in tiles.iterrows():
110-
futures.append(
111-
executor.submit(download_and_process_tile, row, attempt_limit)
112-
)
113-
114-
for future in as_completed(futures):
115-
if future is not None:
116-
df = future.result()
117-
118-
if df is not None and not df.empty:
119-
downloaded_metadata.append(df)
107+
downloaded_metadata = parallelized_processing(
108+
downloaded_metadata, kwargs, polygon, tiles, workers
109+
)
120110
if len(downloaded_metadata):
121111
downloaded_metadata = pd.concat(downloaded_metadata, ignore_index=True)
122112
else:
123113
return pd.DataFrame(downloaded_metadata)
124114

125-
target_columns = [
126-
"id",
127-
"geometry",
128-
"captured_at",
129-
"is_pano",
130-
"compass_angle",
131-
"sequence",
132-
"organization_id",
133-
]
134-
for col in target_columns:
135-
if col not in downloaded_metadata.columns:
136-
downloaded_metadata[col] = None
137-
if (
138-
downloaded_metadata.isna().all().all() is False
139-
or downloaded_metadata.empty is False
140-
):
141-
downloaded_metadata = downloaded_metadata[
142-
downloaded_metadata["geometry"].apply(
143-
lambda point: point.within(polygon)
144-
)
145-
]
146115
return downloaded_metadata
147116

148117

118+
def parallelized_processing(data, kwargs, polygon, tiles, workers):
119+
process_tile_with_args = partial(
120+
download_and_process_tile, polygon=polygon, kwargs=kwargs
121+
)
122+
with ProcessPoolExecutor(max_workers=workers) as executor:
123+
futures = list(
124+
executor.map(process_tile_with_args, tiles.to_dict(orient="records"))
125+
)
126+
127+
for df in futures:
128+
if df is not None and not df.empty:
129+
data.append(df)
130+
return data
131+
132+
149133
def geojson_to_polygon(geojson_data):
150134
if geojson_data["type"] == "FeatureCollection":
151135
features = geojson_data["features"]
@@ -198,36 +182,31 @@ def filter_results(
198182
)
199183
return None
200184
df = df[df["creator_id"] == creator_id]
201-
202185
if is_pano is not None:
203186
if df["is_pano"].isna().all():
204187
logger.exception("No Mapillary Feature in the AoI has a 'is_pano' value.")
205188
return None
206189
df = df[df["is_pano"] == is_pano]
207-
208190
if organization_id is not None:
209191
if df["organization_id"].isna().all():
210192
logger.exception(
211193
"No Mapillary Feature in the AoI has an 'organization_id' value."
212194
)
213195
return None
214196
df = df[df["organization_id"] == organization_id]
215-
216197
if start_time is not None:
217198
if df["captured_at"].isna().all():
218199
logger.exception(
219200
"No Mapillary Feature in the AoI has a 'captured_at' value."
220201
)
221202
return None
222203
df = filter_by_timerange(df, start_time, end_time)
223-
224204
return df
225205

226206

227207
def get_image_metadata(
228208
aoi_geojson,
229209
level=14,
230-
attempt_limit=3,
231210
is_pano: bool = None,
232211
creator_id: int = None,
233212
organization_id: str = None,
@@ -236,33 +215,22 @@ def get_image_metadata(
236215
randomize_order=False,
237216
sampling_threshold=None,
238217
):
218+
kwargs = {
219+
"is_pano": is_pano,
220+
"creator_id": creator_id,
221+
"organization_id": organization_id,
222+
"start_time": start_time,
223+
"end_time": end_time,
224+
}
239225
aoi_polygon = geojson_to_polygon(aoi_geojson)
240-
downloaded_metadata = coordinate_download(aoi_polygon, level, attempt_limit)
241-
226+
downloaded_metadata = coordinate_download(aoi_polygon, level, kwargs)
242227
if downloaded_metadata.empty or downloaded_metadata.isna().all().all():
243-
raise ValueError("No Mapillary Features in the AoI.")
244-
245-
downloaded_metadata = downloaded_metadata[
246-
downloaded_metadata["geometry"].apply(lambda geom: isinstance(geom, Point))
247-
]
248-
249-
downloaded_metadata = filter_results(
250-
downloaded_metadata,
251-
creator_id,
252-
is_pano,
253-
organization_id,
254-
start_time,
255-
end_time,
256-
)
257-
258-
if (
259-
downloaded_metadata is None
260-
or downloaded_metadata.empty
261-
or downloaded_metadata.isna().all().all()
262-
):
263-
raise ValueError("No Mapillary Features in the AoI match the filter criteria.")
264-
265-
downloaded_metadata = spatial_sampling(downloaded_metadata, sampling_threshold)
228+
raise ValueError(
229+
"No Mapillary Features in the AoI or no Features match the filter criteria."
230+
)
231+
downloaded_metadata = downloaded_metadata.drop_duplicates(subset=["geometry"])
232+
if sampling_threshold is not None:
233+
downloaded_metadata = spatial_sampling(downloaded_metadata, sampling_threshold)
266234

267235
if randomize_order is True:
268236
downloaded_metadata = downloaded_metadata.sample(frac=1).reset_index(drop=True)

0 commit comments

Comments
 (0)