Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions datafusion/core/tests/parquet/filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ async fn predicate_cache_pushdown_default() -> datafusion_common::Result<()> {
#[tokio::test]
async fn predicate_cache_pushdown_disable() -> datafusion_common::Result<()> {
// Can disable the cache even with filter pushdown by setting the size to 0. In this case we
// expect the inner records are reported but no records are read from the cache
// no records are read from the cache and no metrics are reported
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is due to @nuno-faria 's work to close ❤️

I was somewhat surprised that there are no metrics at all reported, but I think it makes sense as the reporting is currently only done by the cache

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @alamb for handling the upgrade.
I think we could add a test to confirm that datafusion.execution.parquet.max_predicate_cache_size now works as expected, by analyzing the explain output.

Here is a potential example:

#[tokio::test]
async fn test_disable_predicate_cache() {
    let mut parquet_options = TableParquetOptions::new();
    parquet_options.global.data_page_row_count_limit = 1;
    parquet_options.global.write_batch_size = 1;

    let tempdir = TempDir::new_in(Path::new(".")).unwrap();
    let path = tempdir.path().to_str().unwrap();

    let ctx = SessionContext::new();
    ctx.sql("select i from generate_series(1, 1000) t(i)")
        .await
        .unwrap()
        .write_parquet(path, DataFrameWriteOptions::new(), Some(parquet_options))
        .await
        .unwrap();

    let regex = Regex::new(r"bytes_scanned=(\d+)").ok().unwrap();

    let config = SessionConfig::new()
        .set_bool("datafusion.execution.parquet.pushdown_filters", true);
    let ctx = SessionContext::new_with_config(config);

    // default: predicate cache is enabled
    ctx.register_parquet("t", path, ParquetReadOptions::new())
        .await
        .unwrap();
    let plan = ctx
        .sql("select * from t where i = 123")
        .await
        .unwrap()
        .explain(false, true)
        .unwrap()
        .to_string()
        .await
        .unwrap();
    let captures = regex.captures(&plan).unwrap();
    let bytes_scanned_default =
        captures.get(1).unwrap().as_str().parse::<usize>().unwrap();

    // disabling the predicate cache by setting the limit to 0
    ctx.sql("set datafusion.execution.parquet.max_predicate_cache_size = 0")
        .await
        .unwrap()
        .collect()
        .await
        .unwrap();
    ctx.deregister_table("t").unwrap();
    ctx.register_parquet("t", path, ParquetReadOptions::new())
        .await
        .unwrap();
    let plan = ctx
        .sql("select * from t where i = 123")
        .await
        .unwrap()
        .explain(false, true)
        .unwrap()
        .to_string()
        .await
        .unwrap();
    let captures = regex.captures(&plan).unwrap();
    let bytes_scanned_cache_disabled =
        captures.get(1).unwrap().as_str().parse::<usize>().unwrap();

    // with the cache disabled, fewer data pages should be retrieved (the predicate cache can
    // retrieve multiple data pages when their size is less than batch_size)
    assert_eq!(bytes_scanned_default, 31405);
    assert_eq!(bytes_scanned_cache_disabled, 1691);
}

let mut config = SessionConfig::new();
config.options_mut().execution.parquet.pushdown_filters = true;
config
Expand All @@ -641,13 +641,10 @@ async fn predicate_cache_pushdown_disable() -> datafusion_common::Result<()> {
.parquet
.max_predicate_cache_size = Some(0);
let ctx = SessionContext::new_with_config(config);
// Since the cache is disabled, there is no reporting or use of the cache
PredicateCacheTest {
// file has 8 rows, which need to be read twice, one for filter, one for
// final output
expected_inner_records: 16,
// Expect this to 0 records read as the cache is disabled. However, it is
// non zero due to https://github.com/apache/arrow-rs/issues/8307
expected_records: 3,
expected_inner_records: 0,
expected_records: 0,
}
.run(&ctx)
.await
Expand Down
Loading