11use arrow:: array:: RecordBatch ;
22use chrono:: NaiveDate ;
33use futures:: stream:: { self , StreamExt } ;
4+ use futures:: { TryFutureExt , TryStreamExt } ;
45use itertools:: Itertools ;
56use object_store:: { path:: Path , ListResult , ObjectMeta , ObjectStore } ;
6- use parquet:: arrow:: arrow_reader:: ArrowPredicate ;
7- use parquet:: arrow:: arrow_reader:: ArrowReaderOptions ;
8- use parquet:: arrow:: async_reader:: ParquetObjectReader ;
9- use parquet:: arrow:: async_reader:: ParquetRecordBatchStreamBuilder ;
107use rayon:: prelude:: * ;
118use std:: sync:: Arc ;
129use std:: time:: Instant ;
1310
11+ use crate :: collection:: collector_ops:: { process_meta_obj_into_tasks, RowGroupTask } ;
1412use crate :: collection:: record:: TransportationConnectorRecord ;
1513use crate :: collection:: record:: TransportationSegmentRecord ;
1614use crate :: collection:: BuildingsRecord ;
@@ -29,7 +27,8 @@ use super::RowFilterConfig;
2927#[ derive( Debug ) ]
3028pub struct OvertureMapsCollector {
3129 obj_store : Arc < dyn ObjectStore > ,
32- batch_size : usize ,
30+ rg_chunk_size : usize ,
31+ file_concurrency_limit : usize ,
3332}
3433
3534impl TryFrom < OvertureMapsCollectorConfig > for OvertureMapsCollector {
@@ -41,10 +40,15 @@ impl TryFrom<OvertureMapsCollectorConfig> for OvertureMapsCollector {
4140}
4241
4342impl OvertureMapsCollector {
44- pub fn new ( object_store : Arc < dyn ObjectStore > , batch_size : usize ) -> Self {
43+ pub fn new (
44+ object_store : Arc < dyn ObjectStore > ,
45+ rg_chunk_size : usize ,
46+ file_concurrency_limit : usize ,
47+ ) -> Self {
4548 Self {
4649 obj_store : object_store,
47- batch_size,
50+ rg_chunk_size,
51+ file_concurrency_limit,
4852 }
4953 }
5054
@@ -121,69 +125,68 @@ impl OvertureMapsCollector {
121125 . map_err ( |e| OvertureMapsCollectionError :: MetadataError ( e. to_string ( ) ) ) ?;
122126
123127 // Prepare the filter predicates
128+ let opt_bbox_filter = row_filter_config
129+ . as_ref ( )
130+ . and_then ( |f| f. get_bbox_filter_if_exists ( ) ) ;
131+
132+ // validate provided bbox
133+ if let Some ( bbox) = opt_bbox_filter. as_ref ( ) {
134+ bbox. validate ( ) ?
135+ } ;
136+
137+ // build rest of the filters
124138 let row_filter = if let Some ( row_filter_config) = & row_filter_config {
139+ row_filter_config. validate_unique_variant ( ) ?;
125140 Some ( RowFilter :: try_from ( row_filter_config. clone ( ) ) ?)
126141 } else {
127142 None
128143 } ;
129144
130- // Instantiate Stream Builders
131- let mut streams = vec ! [ ] ;
132- for meta in meta_objects {
133- log:: debug!( "File Name: {}, Size: {}" , meta. location, meta. size) ;
134-
135- // Parquet objects in charge of processing the incoming stream
136- let opts = ArrowReaderOptions :: new ( ) . with_page_index ( true ) ;
137- let reader = ParquetObjectReader :: new ( self . obj_store . clone ( ) , meta. location )
138- . with_runtime ( io_runtime. handle ( ) . clone ( ) ) ;
139- let builder = runtime
140- . block_on ( ParquetRecordBatchStreamBuilder :: new_with_options (
141- reader, opts,
142- ) )
143- . map_err ( |e| OvertureMapsCollectionError :: ArrowReaderError { source : e } ) ?;
144-
145- // Implement the required query filters
146- // For this we need the scema of each file so we get that from the builder
147- let parquet_metadata = builder. metadata ( ) . file_metadata ( ) ;
148-
149- // Build Arrow filters from RowFilter enum
150- let predicates: Vec < Box < dyn ArrowPredicate > > = if let Some ( filter) = & row_filter {
151- filter. build ( parquet_metadata) ?
152- } else {
153- vec ! [ ]
154- } ;
155-
156- let row_filter = parquet:: arrow:: arrow_reader:: RowFilter :: new ( predicates) ;
157-
158- // Build stream object
159- let stream: parquet:: arrow:: async_reader:: ParquetRecordBatchStream <
160- ParquetObjectReader ,
161- > = builder
162- . with_row_filter ( row_filter)
163- . with_batch_size ( self . batch_size )
164- . build ( )
165- . map_err (
166- |e| OvertureMapsCollectionError :: ParquetRecordBatchStreamError { source : e } ,
167- ) ?;
168-
169- streams. push ( stream) ;
170- }
171-
172145 log:: info!( "Started collection" ) ;
173146 let start_collection = Instant :: now ( ) ;
174- let result_vec = runtime. block_on (
147+ // Process each all metadata object into a flat vector of tasks that
148+ // each take a small number of row_groups. Inside the `process_meta_obj_into_tasks`
149+ // function we also prune based on the bounding box
150+ let row_group_tasks: Vec < RowGroupTask > = runtime. block_on ( async {
151+ Ok ( stream:: iter ( meta_objects)
152+ . map ( |meta| {
153+ process_meta_obj_into_tasks (
154+ meta,
155+ self . obj_store . clone ( ) ,
156+ Some ( io_runtime. handle ( ) . clone ( ) ) ,
157+ opt_bbox_filter,
158+ Some ( self . rg_chunk_size ) ,
159+ )
160+ } )
161+ . buffer_unordered ( self . file_concurrency_limit )
162+ . try_collect :: < Vec < Vec < RowGroupTask > > > ( )
163+ . await ?
164+ . into_iter ( )
165+ . flatten ( )
166+ . collect ( ) )
167+ } ) ?;
168+
169+ // Build and collect streams
170+ let streams = row_group_tasks
171+ . into_iter ( )
172+ . map ( |rgt| {
173+ rgt. build_stream (
174+ row_filter. as_ref ( ) ,
175+ self . obj_store . clone ( ) ,
176+ io_runtime. handle ( ) . clone ( ) ,
177+ )
178+ } )
179+ . collect :: < Result < Vec < _ > , OvertureMapsCollectionError > > ( ) ?;
180+
181+ let record_batches = runtime. block_on (
175182 stream:: iter ( streams)
176- . flatten_unordered ( None )
177- . collect :: < Vec < _ > > ( ) ,
178- ) ;
183+ . flatten_unordered ( self . file_concurrency_limit )
184+ . try_collect :: < Vec < RecordBatch > > ( )
185+ . map_err ( |e| OvertureMapsCollectionError :: RecordBatchRetrievalError { source : e } ) ,
186+ ) ?;
179187 log:: info!( "Collection time {:?}" , start_collection. elapsed( ) ) ;
180188
181- // Unpack record batches
182- let record_batches: Vec < RecordBatch > = result_vec
183- . into_iter ( )
184- . collect :: < Result < Vec < RecordBatch > , _ > > ( )
185- . map_err ( |e| OvertureMapsCollectionError :: RecordBatchRetrievalError { source : e } ) ?;
186-
189+ // Deserialize the batches into Records
187190 let start_deserialization = Instant :: now ( ) ;
188191 let records: Vec < OvertureRecord > = record_batches
189192 . par_iter ( )
@@ -235,7 +238,7 @@ mod test {
235238 use std:: str:: FromStr ;
236239
237240 fn get_collector ( ) -> OvertureMapsCollector {
238- OvertureMapsCollectorConfig :: new ( ObjectStoreSource :: AmazonS3 , 512 )
241+ OvertureMapsCollectorConfig :: new ( ObjectStoreSource :: AmazonS3 , Some ( 4 ) , Some ( 64 ) )
239242 . build ( )
240243 . unwrap ( )
241244 }
@@ -257,7 +260,7 @@ mod test {
257260 let connector_records = collector
258261 . collect_from_release (
259262 ReleaseVersion :: Monthly {
260- datetime : NaiveDate :: from_str ( "2025-11-19 " ) . unwrap ( ) ,
263+ datetime : NaiveDate :: from_str ( "2025-12-17 " ) . unwrap ( ) ,
261264 version : Some ( 0 ) ,
262265 } ,
263266 & OvertureRecordType :: Connector ,
@@ -267,7 +270,7 @@ mod test {
267270
268271 println ! ( "Records Length: {}" , connector_records. len( ) ) ;
269272
270- assert_eq ! ( connector_records. len( ) , 6401 ) ;
273+ assert_eq ! ( connector_records. len( ) , 6436 ) ;
271274 assert ! ( matches!(
272275 connector_records[ 0 ] ,
273276 OvertureRecord :: Connector ( ..)
@@ -277,7 +280,7 @@ mod test {
277280 let segment_records = collector
278281 . collect_from_release (
279282 ReleaseVersion :: Monthly {
280- datetime : NaiveDate :: from_str ( "2025-11-19 " ) . unwrap ( ) ,
283+ datetime : NaiveDate :: from_str ( "2025-12-17 " ) . unwrap ( ) ,
281284 version : Some ( 0 ) ,
282285 } ,
283286 & OvertureRecordType :: Segment ,
@@ -287,7 +290,7 @@ mod test {
287290
288291 println ! ( "Records Length: {}" , segment_records. len( ) ) ;
289292
290- assert_eq ! ( segment_records. len( ) , 3804 ) ;
293+ assert_eq ! ( segment_records. len( ) , 3771 ) ;
291294 assert ! ( matches!( segment_records[ 0 ] , OvertureRecord :: Segment ( ..) ) ) ;
292295 }
293296}
0 commit comments