Skip to content

Commit bc2bcd5

Browse files
committed
feat: use ProcessPoolExecutor for download_and_process_tile
1 parent 508edf8 commit bc2bcd5

File tree

1 file changed

+59
-85
lines changed

1 file changed

+59
-85
lines changed
Lines changed: 59 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,11 @@
11
import os
2-
from concurrent.futures import ThreadPoolExecutor, as_completed
2+
from concurrent.futures import ProcessPoolExecutor
3+
from functools import partial
34

45
import mercantile
56
import pandas as pd
67
import requests
7-
from shapely import (
8-
LineString,
9-
MultiLineString,
10-
MultiPolygon,
11-
Point,
12-
Polygon,
13-
box,
14-
unary_union,
15-
)
8+
from shapely import MultiPolygon, Point, Polygon, box, unary_union
169
from shapely.geometry import shape
1710
from vt2geojson import tools as vt2geojson_tools
1811

@@ -44,7 +37,7 @@ def create_tiles(polygon, level):
4437
return tiles
4538

4639

47-
def download_and_process_tile(row, attempt_limit=3):
40+
def download_and_process_tile(row, polygon, kwargs, attempt_limit=3):
4841
z = row["z"]
4942
x = row["x"]
5043
y = row["y"]
@@ -59,31 +52,37 @@ def download_and_process_tile(row, attempt_limit=3):
5952
"features", []
6053
)
6154
data = []
62-
for feature in features:
63-
geometry = feature.get("geometry", {})
64-
properties = feature.get("properties", {})
65-
geometry_type = geometry.get("type", None)
66-
coordinates = geometry.get("coordinates", [])
67-
68-
element_geometry = None
69-
if geometry_type == "Point":
70-
element_geometry = Point(coordinates)
71-
elif geometry_type == "LineString":
72-
element_geometry = LineString(coordinates)
73-
elif geometry_type == "MultiLineString":
74-
element_geometry = MultiLineString(coordinates)
75-
elif geometry_type == "Polygon":
76-
element_geometry = Polygon(coordinates[0])
77-
elif geometry_type == "MultiPolygon":
78-
element_geometry = MultiPolygon(coordinates)
79-
80-
# Append the dictionary with geometry and properties
81-
row = {"geometry": element_geometry, **properties}
82-
data.append(row)
55+
data.extend(
56+
[
57+
{
58+
"geometry": Point(feature["geometry"]["coordinates"]),
59+
**feature.get("properties", {}),
60+
}
61+
for feature in features
62+
if feature.get("geometry", {}).get("type") == "Point"
63+
]
64+
)
8365

8466
data = pd.DataFrame(data)
8567

86-
if not data.empty:
68+
if data.isna().all().all() is False or data.empty is False:
69+
data = data[data["geometry"].apply(lambda point: point.within(polygon))]
70+
target_columns = [
71+
"id",
72+
"geometry",
73+
"captured_at",
74+
"is_pano",
75+
"compass_angle",
76+
"sequence",
77+
"organization_id",
78+
]
79+
for col in target_columns:
80+
if col not in data.columns:
81+
data[col] = None
82+
83+
if data.isna().all().all() is False or data.empty is False:
84+
data = filter_results(data, **kwargs)
85+
8786
return data
8887
except Exception as e:
8988
print(f"An exception occurred while requesting a tile: {e}")
@@ -94,7 +93,7 @@ def download_and_process_tile(row, attempt_limit=3):
9493

9594

9695
def coordinate_download(
97-
polygon, level, use_concurrency=True, attempt_limit=3, workers=os.cpu_count() * 4
96+
polygon, level, kwargs: dict, use_concurrency=True, workers=os.cpu_count() * 4
9897
):
9998
tiles = create_tiles(polygon, level)
10099

@@ -104,45 +103,22 @@ def coordinate_download(
104103
if not use_concurrency:
105104
workers = 1
106105

107-
futures = []
108-
with ThreadPoolExecutor(max_workers=workers) as executor:
109-
for index, row in tiles.iterrows():
110-
futures.append(
111-
executor.submit(download_and_process_tile, row, attempt_limit)
112-
)
113-
114-
for future in as_completed(futures):
115-
if future is not None:
116-
df = future.result()
106+
process_tile_with_args = partial(
107+
download_and_process_tile, polygon=polygon, kwargs=kwargs
108+
)
109+
with ProcessPoolExecutor(max_workers=workers) as executor:
110+
futures = list(
111+
executor.map(process_tile_with_args, tiles.to_dict(orient="records"))
112+
)
117113

118-
if df is not None and not df.empty:
119-
downloaded_metadata.append(df)
114+
for df in futures:
115+
if df is not None and not df.empty:
116+
downloaded_metadata.append(df)
120117
if len(downloaded_metadata):
121118
downloaded_metadata = pd.concat(downloaded_metadata, ignore_index=True)
122119
else:
123120
return pd.DataFrame(downloaded_metadata)
124121

125-
target_columns = [
126-
"id",
127-
"geometry",
128-
"captured_at",
129-
"is_pano",
130-
"compass_angle",
131-
"sequence",
132-
"organization_id",
133-
]
134-
for col in target_columns:
135-
if col not in downloaded_metadata.columns:
136-
downloaded_metadata[col] = None
137-
if (
138-
downloaded_metadata.isna().all().all() is False
139-
or downloaded_metadata.empty is False
140-
):
141-
downloaded_metadata = downloaded_metadata[
142-
downloaded_metadata["geometry"].apply(
143-
lambda point: point.within(polygon)
144-
)
145-
]
146122
return downloaded_metadata
147123

148124

@@ -198,74 +174,72 @@ def filter_results(
198174
)
199175
return None
200176
df = df[df["creator_id"] == creator_id]
201-
202177
if is_pano is not None:
203178
if df["is_pano"].isna().all():
179+
print(df)
204180
logger.exception("No Mapillary Feature in the AoI has a 'is_pano' value.")
205181
return None
206182
df = df[df["is_pano"] == is_pano]
207-
208183
if organization_id is not None:
209184
if df["organization_id"].isna().all():
210185
logger.exception(
211186
"No Mapillary Feature in the AoI has an 'organization_id' value."
212187
)
213188
return None
214189
df = df[df["organization_id"] == organization_id]
215-
216190
if start_time is not None:
217191
if df["captured_at"].isna().all():
218192
logger.exception(
219193
"No Mapillary Feature in the AoI has a 'captured_at' value."
220194
)
221195
return None
222196
df = filter_by_timerange(df, start_time, end_time)
223-
224197
return df
225198

226199

227200
def get_image_metadata(
228201
aoi_geojson,
229202
level=14,
230-
attempt_limit=3,
231203
is_pano: bool = None,
232204
creator_id: int = None,
233205
organization_id: str = None,
234206
start_time: str = None,
235207
end_time: str = None,
236208
sampling_threshold=None,
237209
):
210+
kwargs = {
211+
"is_pano": is_pano,
212+
"creator_id": creator_id,
213+
"organization_id": organization_id,
214+
"start_time": start_time,
215+
"end_time": end_time,
216+
}
238217
aoi_polygon = geojson_to_polygon(aoi_geojson)
239-
downloaded_metadata = coordinate_download(aoi_polygon, level, attempt_limit)
240-
218+
downloaded_metadata = coordinate_download(aoi_polygon, level, kwargs)
241219
if downloaded_metadata.empty or downloaded_metadata.isna().all().all():
242220
raise ValueError("No Mapillary Features in the AoI.")
243221

244222
downloaded_metadata = downloaded_metadata[
245223
downloaded_metadata["geometry"].apply(lambda geom: isinstance(geom, Point))
246224
]
247225

248-
filtered_metadata = filter_results(
249-
downloaded_metadata, creator_id, is_pano, organization_id, start_time, end_time
250-
)
251-
252226
if (
253-
filtered_metadata is None
254-
or filtered_metadata.empty
255-
or filtered_metadata.isna().all().all()
227+
downloaded_metadata is None
228+
or downloaded_metadata.empty
229+
or downloaded_metadata.isna().all().all()
256230
):
257231
raise ValueError("No Mapillary Features in the AoI match the filter criteria.")
258232

259233
if sampling_threshold is not None:
260-
filtered_metadata = spatial_sampling(filtered_metadata, sampling_threshold)
234+
downloaded_metadata = spatial_sampling(downloaded_metadata, sampling_threshold)
261235

262-
total_images = len(filtered_metadata)
236+
total_images = len(downloaded_metadata)
263237
if total_images > 100000:
264238
raise ValueError(
265239
f"Too many Images with selected filter options for the AoI: {total_images}"
266240
)
267241

268242
return {
269-
"ids": filtered_metadata["id"].tolist(),
270-
"geometries": filtered_metadata["geometry"].tolist(),
243+
"ids": downloaded_metadata["id"].tolist(),
244+
"geometries": downloaded_metadata["geometry"].tolist(),
271245
}

0 commit comments

Comments
 (0)