Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions rust/bambam-omf/src/app/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,14 @@ fn run_collector(
bbox_arg: Option<&CliBoundingBox>,
) -> Result<TransportationCollection, OvertureMapsCollectionError> {
let object_store = ObjectStoreSource::AmazonS3;
let batch_size = 128;
let collector = OvertureMapsCollectorConfig::new(object_store, batch_size).build()?;
let rg_chunk_size = 4;
let file_concurrency_limit = 64;
let collector = OvertureMapsCollectorConfig::new(
object_store,
Some(rg_chunk_size),
Some(file_concurrency_limit),
)
.build()?;
let release = ReleaseVersion::Latest;
let bbox = bbox_arg.ok_or_else(|| {
let msg = String::from("must provide bbox argument for download");
Expand All @@ -87,7 +93,8 @@ fn run_collector(
log::info!(
"running OMF import with
object store {object_store:?}
batch size {batch_size}
rg_chunk_size {rg_chunk_size}
file_concurrency_limit {file_concurrency_limit}
release {release}
(xmin,xmax,ymin,ymax): {bbox}"
);
Expand Down
131 changes: 67 additions & 64 deletions rust/bambam-omf/src/collection/collector.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use arrow::array::RecordBatch;
use chrono::NaiveDate;
use futures::stream::{self, StreamExt};
use futures::{TryFutureExt, TryStreamExt};
use itertools::Itertools;
use object_store::{path::Path, ListResult, ObjectMeta, ObjectStore};
use parquet::arrow::arrow_reader::ArrowPredicate;
use parquet::arrow::arrow_reader::ArrowReaderOptions;
use parquet::arrow::async_reader::ParquetObjectReader;
use parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder;
use rayon::prelude::*;
use std::sync::Arc;
use std::time::Instant;

use crate::collection::collector_ops::{process_meta_obj_into_tasks, RowGroupTask};
use crate::collection::record::TransportationConnectorRecord;
use crate::collection::record::TransportationSegmentRecord;
use crate::collection::BuildingsRecord;
Expand All @@ -29,7 +27,8 @@ use super::RowFilterConfig;
#[derive(Debug)]
pub struct OvertureMapsCollector {
obj_store: Arc<dyn ObjectStore>,
batch_size: usize,
rg_chunk_size: usize,
file_concurrency_limit: usize,
}

impl TryFrom<OvertureMapsCollectorConfig> for OvertureMapsCollector {
Expand All @@ -41,10 +40,15 @@ impl TryFrom<OvertureMapsCollectorConfig> for OvertureMapsCollector {
}

impl OvertureMapsCollector {
pub fn new(object_store: Arc<dyn ObjectStore>, batch_size: usize) -> Self {
pub fn new(
object_store: Arc<dyn ObjectStore>,
rg_chunk_size: usize,
file_concurrency_limit: usize,
) -> Self {
Self {
obj_store: object_store,
batch_size,
rg_chunk_size,
file_concurrency_limit,
}
}

Expand Down Expand Up @@ -121,69 +125,68 @@ impl OvertureMapsCollector {
.map_err(|e| OvertureMapsCollectionError::MetadataError(e.to_string()))?;

// Prepare the filter predicates
let opt_bbox_filter = row_filter_config
.as_ref()
.and_then(|f| f.get_bbox_filter_if_exists());

// validate provided bbox
if let Some(bbox) = opt_bbox_filter.as_ref() {
bbox.validate()?
};

// build rest of the filters
let row_filter = if let Some(row_filter_config) = &row_filter_config {
row_filter_config.validate_unique_variant()?;
Some(RowFilter::try_from(row_filter_config.clone())?)
} else {
None
};

// Instantiate Stream Builders
let mut streams = vec![];
for meta in meta_objects {
log::debug!("File Name: {}, Size: {}", meta.location, meta.size);

// Parquet objects in charge of processing the incoming stream
let opts = ArrowReaderOptions::new().with_page_index(true);
let reader = ParquetObjectReader::new(self.obj_store.clone(), meta.location)
.with_runtime(io_runtime.handle().clone());
let builder = runtime
.block_on(ParquetRecordBatchStreamBuilder::new_with_options(
reader, opts,
))
.map_err(|e| OvertureMapsCollectionError::ArrowReaderError { source: e })?;

// Implement the required query filters
// For this we need the scema of each file so we get that from the builder
let parquet_metadata = builder.metadata().file_metadata();

// Build Arrow filters from RowFilter enum
let predicates: Vec<Box<dyn ArrowPredicate>> = if let Some(filter) = &row_filter {
filter.build(parquet_metadata)?
} else {
vec![]
};

let row_filter = parquet::arrow::arrow_reader::RowFilter::new(predicates);

// Build stream object
let stream: parquet::arrow::async_reader::ParquetRecordBatchStream<
ParquetObjectReader,
> = builder
.with_row_filter(row_filter)
.with_batch_size(self.batch_size)
.build()
.map_err(
|e| OvertureMapsCollectionError::ParquetRecordBatchStreamError { source: e },
)?;

streams.push(stream);
}

log::info!("Started collection");
let start_collection = Instant::now();
let result_vec = runtime.block_on(
// Process each all metadata object into a flat vector of tasks that
// each take a small number of row_groups. Inside the `process_meta_obj_into_tasks`
// function we also prune based on the bounding box
let row_group_tasks: Vec<RowGroupTask> = runtime.block_on(async {
Ok(stream::iter(meta_objects)
.map(|meta| {
process_meta_obj_into_tasks(
meta,
self.obj_store.clone(),
Some(io_runtime.handle().clone()),
opt_bbox_filter,
Some(self.rg_chunk_size),
)
})
.buffer_unordered(self.file_concurrency_limit)
.try_collect::<Vec<Vec<RowGroupTask>>>()
.await?
.into_iter()
.flatten()
.collect())
})?;

// Build and collect streams
let streams = row_group_tasks
.into_iter()
.map(|rgt| {
rgt.build_stream(
row_filter.as_ref(),
self.obj_store.clone(),
io_runtime.handle().clone(),
)
})
.collect::<Result<Vec<_>, OvertureMapsCollectionError>>()?;

let record_batches = runtime.block_on(
stream::iter(streams)
.flatten_unordered(None)
.collect::<Vec<_>>(),
);
.flatten_unordered(self.file_concurrency_limit)
.try_collect::<Vec<RecordBatch>>()
.map_err(|e| OvertureMapsCollectionError::RecordBatchRetrievalError { source: e }),
)?;
log::info!("Collection time {:?}", start_collection.elapsed());

// Unpack record batches
let record_batches: Vec<RecordBatch> = result_vec
.into_iter()
.collect::<Result<Vec<RecordBatch>, _>>()
.map_err(|e| OvertureMapsCollectionError::RecordBatchRetrievalError { source: e })?;

// Deserialize the batches into Records
let start_deserialization = Instant::now();
let records: Vec<OvertureRecord> = record_batches
.par_iter()
Expand Down Expand Up @@ -235,7 +238,7 @@ mod test {
use std::str::FromStr;

fn get_collector() -> OvertureMapsCollector {
OvertureMapsCollectorConfig::new(ObjectStoreSource::AmazonS3, 512)
OvertureMapsCollectorConfig::new(ObjectStoreSource::AmazonS3, Some(4), Some(64))
.build()
.unwrap()
}
Expand All @@ -257,7 +260,7 @@ mod test {
let connector_records = collector
.collect_from_release(
ReleaseVersion::Monthly {
datetime: NaiveDate::from_str("2025-11-19").unwrap(),
datetime: NaiveDate::from_str("2025-12-17").unwrap(),
version: Some(0),
},
&OvertureRecordType::Connector,
Expand All @@ -267,7 +270,7 @@ mod test {

println!("Records Length: {}", connector_records.len());

assert_eq!(connector_records.len(), 6401);
assert_eq!(connector_records.len(), 6436);
assert!(matches!(
connector_records[0],
OvertureRecord::Connector(..)
Expand All @@ -277,7 +280,7 @@ mod test {
let segment_records = collector
.collect_from_release(
ReleaseVersion::Monthly {
datetime: NaiveDate::from_str("2025-11-19").unwrap(),
datetime: NaiveDate::from_str("2025-12-17").unwrap(),
version: Some(0),
},
&OvertureRecordType::Segment,
Expand All @@ -287,7 +290,7 @@ mod test {

println!("Records Length: {}", segment_records.len());

assert_eq!(segment_records.len(), 3804);
assert_eq!(segment_records.len(), 3771);
assert!(matches!(segment_records[0], OvertureRecord::Segment(..)));
}
}
21 changes: 15 additions & 6 deletions rust/bambam-omf/src/collection/collector_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,31 +9,40 @@ use super::OvertureMapsCollector;
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
pub struct OvertureMapsCollectorConfig {
obj_store_type: ObjectStoreSource,
batch_size: usize,
// Number of row groups to schedule for each process. Defaults to 4
rg_chunk_size: Option<usize>,
// Limit to the number of files to process simultaneously. Defaults to 64
file_concurrency_limit: Option<usize>,
}

impl Default for OvertureMapsCollectorConfig {
fn default() -> Self {
Self {
obj_store_type: ObjectStoreSource::AmazonS3,
batch_size: 4096 * 32,
rg_chunk_size: Some(4),
file_concurrency_limit: Some(64),
}
}
}

impl OvertureMapsCollectorConfig {
pub fn new(obj_store_type: ObjectStoreSource, batch_size: usize) -> Self {
pub fn new(
obj_store_type: ObjectStoreSource,
rg_chunk_size: Option<usize>,
file_concurrency_limit: Option<usize>,
) -> Self {
Self {
obj_store_type,
batch_size,
rg_chunk_size,
file_concurrency_limit,
}
}

pub fn build(&self) -> Result<OvertureMapsCollector, OvertureMapsCollectionError> {
Ok(OvertureMapsCollector::new(
self.obj_store_type.build()?,
// self.row_filter_config.clone(),
self.batch_size,
self.rg_chunk_size.unwrap_or(4),
self.file_concurrency_limit.unwrap_or(64),
))
}
}
Loading
Loading