Skip to content

Commit ca904b3

Browse files
authored
Row group limit pruning for row groups that entirely match predicates (apache#18868)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Part of apache#18860 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> See apache#18860 (comment) ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> 1. How to decide if we can do limit pruning without messing up the sql semantics. 2. Add logic to decide if a row group is fully matched, all rows in the row group are matched the predicated. 3. Use the fully matched row groups to return limit rows. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 3. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> No, no new configs, or API change
1 parent 5edda9b commit ca904b3

File tree

21 files changed

+958
-53
lines changed

21 files changed

+958
-53
lines changed

datafusion/core/tests/parquet/mod.rs

Lines changed: 88 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use arrow::{
3030
record_batch::RecordBatch,
3131
util::pretty::pretty_format_batches,
3232
};
33+
use arrow_schema::SchemaRef;
3334
use chrono::{Datelike, Duration, TimeDelta};
3435
use datafusion::{
3536
datasource::{TableProvider, provider_as_source},
@@ -110,6 +111,26 @@ struct ContextWithParquet {
110111
ctx: SessionContext,
111112
}
112113

114+
struct PruningMetric {
115+
total_pruned: usize,
116+
total_matched: usize,
117+
total_fully_matched: usize,
118+
}
119+
120+
impl PruningMetric {
121+
pub fn total_pruned(&self) -> usize {
122+
self.total_pruned
123+
}
124+
125+
pub fn total_matched(&self) -> usize {
126+
self.total_matched
127+
}
128+
129+
pub fn total_fully_matched(&self) -> usize {
130+
self.total_fully_matched
131+
}
132+
}
133+
113134
/// The output of running one of the test cases
114135
struct TestOutput {
115136
/// The input query SQL
@@ -127,8 +148,8 @@ struct TestOutput {
127148
impl TestOutput {
128149
/// retrieve the value of the named metric, if any
129150
fn metric_value(&self, metric_name: &str) -> Option<usize> {
130-
if let Some((pruned, _matched)) = self.pruning_metric(metric_name) {
131-
return Some(pruned);
151+
if let Some(pm) = self.pruning_metric(metric_name) {
152+
return Some(pm.total_pruned());
132153
}
133154

134155
self.parquet_metrics
@@ -141,9 +162,10 @@ impl TestOutput {
141162
})
142163
}
143164

144-
fn pruning_metric(&self, metric_name: &str) -> Option<(usize, usize)> {
165+
fn pruning_metric(&self, metric_name: &str) -> Option<PruningMetric> {
145166
let mut total_pruned = 0;
146167
let mut total_matched = 0;
168+
let mut total_fully_matched = 0;
147169
let mut found = false;
148170

149171
for metric in self.parquet_metrics.iter() {
@@ -155,12 +177,18 @@ impl TestOutput {
155177
{
156178
total_pruned += pruning_metrics.pruned();
157179
total_matched += pruning_metrics.matched();
180+
total_fully_matched += pruning_metrics.fully_matched();
181+
158182
found = true;
159183
}
160184
}
161185

162186
if found {
163-
Some((total_pruned, total_matched))
187+
Some(PruningMetric {
188+
total_pruned,
189+
total_matched,
190+
total_fully_matched,
191+
})
164192
} else {
165193
None
166194
}
@@ -172,27 +200,33 @@ impl TestOutput {
172200
}
173201

174202
/// The number of row_groups pruned / matched by bloom filter
175-
fn row_groups_bloom_filter(&self) -> Option<(usize, usize)> {
203+
fn row_groups_bloom_filter(&self) -> Option<PruningMetric> {
176204
self.pruning_metric("row_groups_pruned_bloom_filter")
177205
}
178206

179207
/// The number of row_groups matched by statistics
180208
fn row_groups_matched_statistics(&self) -> Option<usize> {
181209
self.pruning_metric("row_groups_pruned_statistics")
182-
.map(|(_pruned, matched)| matched)
210+
.map(|pm| pm.total_matched())
211+
}
212+
213+
/// The number of row_groups fully matched by statistics
214+
fn row_groups_fully_matched_statistics(&self) -> Option<usize> {
215+
self.pruning_metric("row_groups_pruned_statistics")
216+
.map(|pm| pm.total_fully_matched())
183217
}
184218

185219
/// The number of row_groups pruned by statistics
186220
fn row_groups_pruned_statistics(&self) -> Option<usize> {
187221
self.pruning_metric("row_groups_pruned_statistics")
188-
.map(|(pruned, _matched)| pruned)
222+
.map(|pm| pm.total_pruned())
189223
}
190224

191225
/// Metric `files_ranges_pruned_statistics` tracks both pruned and matched count,
192226
/// for testing purpose, here it only aggregate the `pruned` count.
193227
fn files_ranges_pruned_statistics(&self) -> Option<usize> {
194228
self.pruning_metric("files_ranges_pruned_statistics")
195-
.map(|(pruned, _matched)| pruned)
229+
.map(|pm| pm.total_pruned())
196230
}
197231

198232
/// The number of row_groups matched by bloom filter or statistics
@@ -201,22 +235,27 @@ impl TestOutput {
201235
/// filter: 7 total -> 3 matched, this function returns 3 for the final matched
202236
/// count.
203237
fn row_groups_matched(&self) -> Option<usize> {
204-
self.row_groups_bloom_filter()
205-
.map(|(_pruned, matched)| matched)
238+
self.row_groups_bloom_filter().map(|pm| pm.total_matched())
206239
}
207240

208241
/// The number of row_groups pruned
209242
fn row_groups_pruned(&self) -> Option<usize> {
210243
self.row_groups_bloom_filter()
211-
.map(|(pruned, _matched)| pruned)
244+
.map(|pm| pm.total_pruned())
212245
.zip(self.row_groups_pruned_statistics())
213246
.map(|(a, b)| a + b)
214247
}
215248

216249
/// The number of row pages pruned
217250
fn row_pages_pruned(&self) -> Option<usize> {
218251
self.pruning_metric("page_index_rows_pruned")
219-
.map(|(pruned, _matched)| pruned)
252+
.map(|pm| pm.total_pruned())
253+
}
254+
255+
/// The number of row groups pruned by limit pruning
256+
fn limit_pruned_row_groups(&self) -> Option<usize> {
257+
self.pruning_metric("limit_pruned_row_groups")
258+
.map(|pm| pm.total_pruned())
220259
}
221260

222261
fn description(&self) -> String {
@@ -232,20 +271,41 @@ impl TestOutput {
232271
/// and the appropriate scenario
233272
impl ContextWithParquet {
234273
async fn new(scenario: Scenario, unit: Unit) -> Self {
235-
Self::with_config(scenario, unit, SessionConfig::new()).await
274+
Self::with_config(scenario, unit, SessionConfig::new(), None, None).await
275+
}
276+
277+
/// Set custom schema and batches for the test
278+
pub async fn with_custom_data(
279+
scenario: Scenario,
280+
unit: Unit,
281+
schema: Arc<Schema>,
282+
batches: Vec<RecordBatch>,
283+
) -> Self {
284+
Self::with_config(
285+
scenario,
286+
unit,
287+
SessionConfig::new(),
288+
Some(schema),
289+
Some(batches),
290+
)
291+
.await
236292
}
237293

238294
async fn with_config(
239295
scenario: Scenario,
240296
unit: Unit,
241297
mut config: SessionConfig,
298+
custom_schema: Option<SchemaRef>,
299+
custom_batches: Option<Vec<RecordBatch>>,
242300
) -> Self {
243301
// Use a single partition for deterministic results no matter how many CPUs the host has
244302
config = config.with_target_partitions(1);
245303
let file = match unit {
246304
Unit::RowGroup(row_per_group) => {
247305
config = config.with_parquet_bloom_filter_pruning(true);
248-
make_test_file_rg(scenario, row_per_group).await
306+
config.options_mut().execution.parquet.pushdown_filters = true;
307+
make_test_file_rg(scenario, row_per_group, custom_schema, custom_batches)
308+
.await
249309
}
250310
Unit::Page(row_per_page) => {
251311
config = config.with_parquet_page_index_pruning(true);
@@ -1075,7 +1135,12 @@ fn create_data_batch(scenario: Scenario) -> Vec<RecordBatch> {
10751135
}
10761136

10771137
/// Create a test parquet file with various data types
1078-
async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTempFile {
1138+
async fn make_test_file_rg(
1139+
scenario: Scenario,
1140+
row_per_group: usize,
1141+
custom_schema: Option<SchemaRef>,
1142+
custom_batches: Option<Vec<RecordBatch>>,
1143+
) -> NamedTempFile {
10791144
let mut output_file = tempfile::Builder::new()
10801145
.prefix("parquet_pruning")
10811146
.suffix(".parquet")
@@ -1088,8 +1153,14 @@ async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTem
10881153
.set_statistics_enabled(EnabledStatistics::Page)
10891154
.build();
10901155

1091-
let batches = create_data_batch(scenario);
1092-
let schema = batches[0].schema();
1156+
let (batches, schema) =
1157+
if let (Some(schema), Some(batches)) = (custom_schema, custom_batches) {
1158+
(batches, schema)
1159+
} else {
1160+
let batches = create_data_batch(scenario);
1161+
let schema = batches[0].schema();
1162+
(batches, schema)
1163+
};
10931164

10941165
let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap();
10951166

0 commit comments

Comments
 (0)