11import os
2- from concurrent .futures import ThreadPoolExecutor , as_completed
2+ from concurrent .futures import ProcessPoolExecutor
3+ from functools import partial
34
45import mercantile
56import pandas as pd
67import requests
7- from shapely import (
8- LineString ,
9- MultiLineString ,
10- MultiPolygon ,
11- Point ,
12- Polygon ,
13- box ,
14- unary_union ,
15- )
8+ from shapely import MultiPolygon , Point , Polygon , box , unary_union
169from shapely .geometry import shape
1710from vt2geojson import tools as vt2geojson_tools
1811
@@ -31,9 +24,9 @@ def create_tiles(polygon, level):
3124 if isinstance (polygon , Polygon ):
3225 polygon = MultiPolygon ([polygon ])
3326
34- tiles = []
27+ tiles = set ()
3528 for i , poly in enumerate (polygon .geoms ):
36- tiles .extend (list (mercantile .tiles (* poly .bounds , level )))
29+ tiles .update (list (mercantile .tiles (* poly .bounds , level )))
3730
3831 bbox_list = [mercantile .bounds (tile .x , tile .y , tile .z ) for tile in tiles ]
3932 bbox_polygons = [box (* bbox ) for bbox in bbox_list ]
@@ -49,7 +42,7 @@ def create_tiles(polygon, level):
4942 return tiles
5043
5144
52- def download_and_process_tile (row , attempt_limit = 3 ):
45+ def download_and_process_tile (row , polygon , kwargs , attempt_limit = 3 ):
5346 z = row ["z" ]
5447 x = row ["x" ]
5548 y = row ["y" ]
@@ -58,37 +51,24 @@ def download_and_process_tile(row, attempt_limit=3):
5851 attempt = 0
5952 while attempt < attempt_limit :
6053 try :
61- r = requests .get (url )
62- assert r .status_code == 200 , r .content
63- features = vt2geojson_tools .vt_bytes_to_geojson (r .content , x , y , z ).get (
64- "features" , []
65- )
66- data = []
67- for feature in features :
68- geometry = feature .get ("geometry" , {})
69- properties = feature .get ("properties" , {})
70- geometry_type = geometry .get ("type" , None )
71- coordinates = geometry .get ("coordinates" , [])
72-
73- element_geometry = None
74- if geometry_type == "Point" :
75- element_geometry = Point (coordinates )
76- elif geometry_type == "LineString" :
77- element_geometry = LineString (coordinates )
78- elif geometry_type == "MultiLineString" :
79- element_geometry = MultiLineString (coordinates )
80- elif geometry_type == "Polygon" :
81- element_geometry = Polygon (coordinates [0 ])
82- elif geometry_type == "MultiPolygon" :
83- element_geometry = MultiPolygon (coordinates )
84-
85- # Append the dictionary with geometry and properties
86- row = {"geometry" : element_geometry , ** properties }
87- data .append (row )
88-
89- data = pd .DataFrame (data )
90-
91- if not data .empty :
54+ data = get_mapillary_data (url , x , y , z )
55+ if data .isna ().all ().all () is False or data .empty is False :
56+ data = data [data ["geometry" ].apply (lambda point : point .within (polygon ))]
57+ target_columns = [
58+ "id" ,
59+ "geometry" ,
60+ "captured_at" ,
61+ "is_pano" ,
62+ "compass_angle" ,
63+ "sequence" ,
64+ "organization_id" ,
65+ ]
66+ for col in target_columns :
67+ if col not in data .columns :
68+ data [col ] = None
69+ if data .isna ().all ().all () is False or data .empty is False :
70+ data = filter_results (data , ** kwargs )
71+
9272 return data
9373 except Exception as e :
9474 print (f"An exception occurred while requesting a tile: { e } " )
@@ -98,8 +78,28 @@ def download_and_process_tile(row, attempt_limit=3):
9878 return None
9979
10080
81+ def get_mapillary_data (url , x , y , z ):
82+ r = requests .get (url )
83+ assert r .status_code == 200 , r .content
84+ features = vt2geojson_tools .vt_bytes_to_geojson (r .content , x , y , z ).get (
85+ "features" , []
86+ )
87+ data = []
88+ data .extend (
89+ [
90+ {
91+ "geometry" : Point (feature ["geometry" ]["coordinates" ]),
92+ ** feature .get ("properties" , {}),
93+ }
94+ for feature in features
95+ if feature .get ("geometry" , {}).get ("type" ) == "Point"
96+ ]
97+ )
98+ return pd .DataFrame (data )
99+
100+
101101def coordinate_download (
102- polygon , level , use_concurrency = True , attempt_limit = 3 , workers = os .cpu_count () * 4
102+ polygon , level , kwargs : dict , use_concurrency = True , workers = os .cpu_count () * 4
103103):
104104 tiles = create_tiles (polygon , level )
105105
@@ -109,48 +109,32 @@ def coordinate_download(
109109 if not use_concurrency :
110110 workers = 1
111111
112- futures = []
113- with ThreadPoolExecutor (max_workers = workers ) as executor :
114- for index , row in tiles .iterrows ():
115- futures .append (
116- executor .submit (download_and_process_tile , row , attempt_limit )
117- )
118-
119- for future in as_completed (futures ):
120- if future is not None :
121- df = future .result ()
122-
123- if df is not None and not df .empty :
124- downloaded_metadata .append (df )
112+ downloaded_metadata = parallelized_processing (
113+ downloaded_metadata , kwargs , polygon , tiles , workers
114+ )
125115 if len (downloaded_metadata ):
126116 downloaded_metadata = pd .concat (downloaded_metadata , ignore_index = True )
127117 else :
128118 return pd .DataFrame (downloaded_metadata )
129119
130- target_columns = [
131- "id" ,
132- "geometry" ,
133- "captured_at" ,
134- "is_pano" ,
135- "compass_angle" ,
136- "sequence" ,
137- "organization_id" ,
138- ]
139- for col in target_columns :
140- if col not in downloaded_metadata .columns :
141- downloaded_metadata [col ] = None
142- if (
143- downloaded_metadata .isna ().all ().all () is False
144- or downloaded_metadata .empty is False
145- ):
146- downloaded_metadata = downloaded_metadata [
147- downloaded_metadata ["geometry" ].apply (
148- lambda point : point .within (polygon )
149- )
150- ]
151120 return downloaded_metadata
152121
153122
123+ def parallelized_processing (data , kwargs , polygon , tiles , workers ):
124+ process_tile_with_args = partial (
125+ download_and_process_tile , polygon = polygon , kwargs = kwargs
126+ )
127+ with ProcessPoolExecutor (max_workers = workers ) as executor :
128+ futures = list (
129+ executor .map (process_tile_with_args , tiles .to_dict (orient = "records" ))
130+ )
131+
132+ for df in futures :
133+ if df is not None and not df .empty :
134+ data .append (df )
135+ return data
136+
137+
154138def geojson_to_polygon (geojson_data ):
155139 if geojson_data ["type" ] == "FeatureCollection" :
156140 features = geojson_data ["features" ]
@@ -201,72 +185,64 @@ def filter_results(
201185 logger .info ("No Mapillary Feature in the AoI has a 'creator_id' value." )
202186 return None
203187 df = df [df ["creator_id" ] == creator_id ]
204-
205188 if is_pano is not None :
206189 if df ["is_pano" ].isna ().all ():
207190 logger .info ("No Mapillary Feature in the AoI has a 'is_pano' value." )
208191 return None
209192 df = df [df ["is_pano" ] == is_pano ]
210-
211193 if organization_id is not None :
212194 if df ["organization_id" ].isna ().all ():
213195 logger .info (
214196 "No Mapillary Feature in the AoI has an 'organization_id' value."
215197 )
216198 return None
217199 df = df [df ["organization_id" ] == organization_id ]
218-
219200 if start_time is not None :
220201 if df ["captured_at" ].isna ().all ():
221202 logger .info ("No Mapillary Feature in the AoI has a 'captured_at' value." )
222203 return None
223204 df = filter_by_timerange (df , start_time , end_time )
224-
225205 return df
226206
227207
228208def get_image_metadata (
229209 aoi_geojson ,
230210 level = 14 ,
231- attempt_limit = 3 ,
232211 is_pano : bool = None ,
233212 creator_id : int = None ,
234213 organization_id : str = None ,
235214 start_time : str = None ,
236215 end_time : str = None ,
216+ randomize_order = False ,
237217 sampling_threshold = None ,
238218):
219+ kwargs = {
220+ "is_pano" : is_pano ,
221+ "creator_id" : creator_id ,
222+ "organization_id" : organization_id ,
223+ "start_time" : start_time ,
224+ "end_time" : end_time ,
225+ }
239226 aoi_polygon = geojson_to_polygon (aoi_geojson )
240- downloaded_metadata = coordinate_download (aoi_polygon , level , attempt_limit )
241-
227+ downloaded_metadata = coordinate_download (aoi_polygon , level , kwargs )
242228 if downloaded_metadata .empty or downloaded_metadata .isna ().all ().all ():
243- raise CustomError ("No Mapillary Features in the AoI." )
244-
245- downloaded_metadata = downloaded_metadata [
246- downloaded_metadata ["geometry" ].apply (lambda geom : isinstance (geom , Point ))
247- ]
248-
249- filtered_metadata = filter_results (
250- downloaded_metadata , creator_id , is_pano , organization_id , start_time , end_time
251- )
252-
253- if (
254- filtered_metadata is None
255- or filtered_metadata .empty
256- or filtered_metadata .isna ().all ().all ()
257- ):
258- raise CustomError ("No Mapillary Features in the AoI match the filter criteria." )
259-
229+ raise CustomError (
230+ "No Mapillary Features in the AoI or no Features match the filter criteria."
231+ )
232+ downloaded_metadata = downloaded_metadata .drop_duplicates (subset = ["geometry" ])
260233 if sampling_threshold is not None :
261- filtered_metadata = spatial_sampling (filtered_metadata , sampling_threshold )
234+ downloaded_metadata = spatial_sampling (downloaded_metadata , sampling_threshold )
235+
236+ if randomize_order is True :
237+ downloaded_metadata = downloaded_metadata .sample (frac = 1 ).reset_index (drop = True )
262238
263- total_images = len (filtered_metadata )
239+ total_images = len (downloaded_metadata )
264240 if total_images > 100000 :
265241 raise CustomError (
266242 f"Too many Images with selected filter options for the AoI: { total_images } "
267243 )
268244
269245 return {
270- "ids" : filtered_metadata ["id" ].tolist (),
271- "geometries" : filtered_metadata ["geometry" ].tolist (),
246+ "ids" : downloaded_metadata ["id" ].tolist (),
247+ "geometries" : downloaded_metadata ["geometry" ].tolist (),
272248 }
0 commit comments