Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ impl TestOutput {
self.metric_value("row_groups_matched_statistics")
}

/// The number of row_groups fully matched by statistics
fn row_groups_fully_matched_statistics(&self) -> Option<usize> {
self.metric_value("row_groups_fully_matched_statistics")
}

/// The number of row_groups pruned by statistics
fn row_groups_pruned_statistics(&self) -> Option<usize> {
self.metric_value("row_groups_pruned_statistics")
Expand Down Expand Up @@ -178,6 +183,11 @@ impl TestOutput {
self.metric_value("page_index_rows_pruned")
}

/// The number of row groups pruned by limit pruning
fn limit_pruned_row_groups(&self) -> Option<usize> {
self.metric_value("limit_pruned_row_groups")
}

fn description(&self) -> String {
format!(
"Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}",
Expand All @@ -191,20 +201,41 @@ impl TestOutput {
/// and the appropriate scenario
impl ContextWithParquet {
async fn new(scenario: Scenario, unit: Unit) -> Self {
Self::with_config(scenario, unit, SessionConfig::new()).await
Self::with_config(scenario, unit, SessionConfig::new(), None, None).await
}

/// Set custom schema and batches for the test
pub async fn with_custom_data(
scenario: Scenario,
unit: Unit,
schema: Arc<Schema>,
batches: Vec<RecordBatch>,
) -> Self {
Self::with_config(
scenario,
unit,
SessionConfig::new(),
Some(schema),
Some(batches),
)
.await
}

async fn with_config(
scenario: Scenario,
unit: Unit,
mut config: SessionConfig,
custom_schema: Option<Arc<Schema>>,
custom_batches: Option<Vec<RecordBatch>>,
) -> Self {
// Use a single partition for deterministic results no matter how many CPUs the host has
config = config.with_target_partitions(1);
let file = match unit {
Unit::RowGroup(row_per_group) => {
config = config.with_parquet_bloom_filter_pruning(true);
make_test_file_rg(scenario, row_per_group).await
config.options_mut().execution.parquet.pushdown_filters = true;
make_test_file_rg(scenario, row_per_group, custom_schema, custom_batches)
.await
}
Unit::Page(row_per_page) => {
config = config.with_parquet_page_index_pruning(true);
Expand Down Expand Up @@ -1030,7 +1061,12 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
}

/// Create a test parquet file with various data types
async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTempFile {
async fn make_test_file_rg(
scenario: Scenario,
row_per_group: usize,
custom_schema: Option<Arc<Schema>>,
custom_batches: Option<Vec<RecordBatch>>,
) -> NamedTempFile {
let mut output_file = tempfile::Builder::new()
.prefix("parquet_pruning")
.suffix(".parquet")
Expand All @@ -1043,8 +1079,14 @@ async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTem
.set_statistics_enabled(EnabledStatistics::Page)
.build();

let batches = create_data_batch(scenario);
let schema = batches[0].schema();
let (batches, schema) =
if let (Some(schema), Some(batches)) = (custom_schema, custom_batches) {
(batches, schema)
} else {
let batches = create_data_batch(scenario);
let schema = batches[0].schema();
(batches, schema)
};

let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap();

Expand Down
Loading
Loading