Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/Processors/Formats/Impl/Parquet/Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ void Reader::prefilterAndInitRowGroups(const std::optional<std::unordered_set<UI
auto add_prewhere_outputs = [&](const ActionsDAG & actions)
{
for (const auto * node : actions.getOutputs())
if (sample_block->has(node->result_name))
if (node->type != ActionsDAG::ActionType::INPUT && sample_block->has(node->result_name))
schemer.external_columns.push_back(node->result_name);
};
if (row_level_filter)
Expand Down Expand Up @@ -688,7 +688,8 @@ void Reader::preparePrewhere()
else
{
if (!prewhere_output_column_idxs.contains(idx_in_output_block))
throw Exception(ErrorCodes::LOGICAL_ERROR, "PREWHERE appears to use its own output as input");
throw Exception(ErrorCodes::LOGICAL_ERROR, "PREWHERE appears to use its own output as input: column '{}' (idx {})",
col.name, idx_in_output_block);
}
step.input_idxs.push_back(idx_in_output_block);
}
Expand Down
10 changes: 10 additions & 0 deletions src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,15 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
return res;
}

bool supportsPrewhere() const override
{
#if USE_AVRO
return std::is_same_v<DataLakeMetadata, IcebergMetadata>;
#else
return false;
#endif
}

private:
const DataLakeStorageSettingsPtr settings;
ObjectStoragePtr ready_object_storage;
Expand Down Expand Up @@ -599,6 +608,7 @@ class StorageIcebergConfiguration : public StorageObjectStorageConfiguration, pu
bool supportsFileIterator() const override { return getImpl().supportsFileIterator(); }
bool supportsParallelInsert() const override { return getImpl().supportsParallelInsert(); }
bool supportsWrites() const override { return getImpl().supportsWrites(); }
bool supportsPrewhere() const override { return getImpl().supportsPrewhere(); }

bool supportsPartialPathPrefix() const override { return getImpl().supportsPartialPathPrefix(); }

Expand Down
2 changes: 1 addition & 1 deletion src/Storages/ObjectStorage/StorageObjectStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ StorageObjectStorage::StorageObjectStorage(
/// There's probably no reason for this, and it should just copy those fields like the others.
/// * If the table contains files in different formats, with only some of them supporting
/// prewhere, things break.
supports_prewhere = !configuration->isDataLakeConfiguration() && format_supports_prewhere;
supports_prewhere = configuration->supportsPrewhere() && format_supports_prewhere;
supports_tuple_elements = format_supports_prewhere;

StorageInMemoryMetadata metadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,11 @@ class StorageObjectStorageConfiguration
return false;
}

virtual bool supportsPrewhere() const
{
return true;
}

virtual void drop(ContextPtr) {}

virtual bool isClusterSupported() const { return true; }
Expand Down
2 changes: 1 addition & 1 deletion src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
auto mapper = configuration->getColumnMapperForObject(object_info);
if (!mapper)
return format_filter_info;
return std::make_shared<FormatFilterInfo>(format_filter_info->filter_actions_dag, format_filter_info->context.lock(), mapper, nullptr, nullptr);
return std::make_shared<FormatFilterInfo>(format_filter_info->filter_actions_dag, format_filter_info->context.lock(), mapper, format_filter_info->row_level_filter, format_filter_info->prewhere_info);
}();

LOG_DEBUG(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

@pytest.mark.parametrize("storage_type", ["s3", "azure", "local"])
@pytest.mark.parametrize("is_table_function", [False, True])
def test_minmax_pruning(started_cluster_iceberg_with_spark, storage_type, is_table_function):
@pytest.mark.parametrize("use_prewhere", [False, True])
def test_minmax_pruning(started_cluster_iceberg_with_spark, storage_type, is_table_function, use_prewhere):
instance = started_cluster_iceberg_with_spark.instances["node1"]
spark = started_cluster_iceberg_with_spark.spark_session
TABLE_NAME = "test_minmax_pruning_" + storage_type + "_" + get_uuid_str()
Expand Down Expand Up @@ -95,6 +96,11 @@ def check_validity_and_get_prunned_files(select_expression):
instance, TABLE_NAME, settings1, settings2, 'IcebergMinMaxIndexPrunedFiles', select_expression
)


where_condition = "WHERE"
if use_prewhere and not is_table_function:
where_condition = "PREWHERE"

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} ORDER BY ALL"
Expand All @@ -103,55 +109,55 @@ def check_validity_and_get_prunned_files(select_expression):
)
assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE date <= '2024-01-25' ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} date <= '2024-01-25' ORDER BY ALL"
)
== 3
)
assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE ts <= timestamp('2024-03-20 14:00:00.000000') ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} ts <= timestamp('2024-03-20 14:00:00.000000') ORDER BY ALL"
)
== 3
)

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE tag == 1 ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} tag == 1 ORDER BY ALL"
)
== 3
)

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE tag <= 1 ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} tag <= 1 ORDER BY ALL"
)
== 3
)

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE name == 'vasilisa' ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} name == 'vasilisa' ORDER BY ALL"
)
== 3
)

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE name < 'kek' ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} name < 'kek' ORDER BY ALL"
)
== 2
)

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE number == 8 ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} number == 8 ORDER BY ALL"
)
== 3
)

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE number <= 5 ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} number <= 5 ORDER BY ALL"
)
== 3
)
Expand All @@ -163,7 +169,7 @@ def check_validity_and_get_prunned_files(select_expression):

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE date3 <= '2024-01-25' ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} date3 <= '2024-01-25' ORDER BY ALL"
)
== 3
)
Expand All @@ -172,14 +178,14 @@ def check_validity_and_get_prunned_files(select_expression):

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE tag <= 1 ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} tag <= 1 ORDER BY ALL"
)
== 3
)

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE time_struct.a <= '2024-02-01' ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} time_struct.a <= '2024-02-01' ORDER BY ALL"
)
== 3
)
Expand All @@ -190,7 +196,7 @@ def check_validity_and_get_prunned_files(select_expression):

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE time_struct.a <= '2024-02-01' ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} time_struct.a <= '2024-02-01' ORDER BY ALL"
)
== 4
)
Expand All @@ -211,15 +217,15 @@ def check_validity_and_get_prunned_files(select_expression):

assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE ddd >= 100 ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} ddd >= 100 ORDER BY ALL"
)
== 2
)
# Spark store rounded values of decimals, this query checks that we work it around.
# Please check the code where we parse lower bounds and upper bounds
assert (
check_validity_and_get_prunned_files(
f"SELECT * FROM {creation_expression} WHERE ddd >= toDecimal64('17.21', 3) ORDER BY ALL"
f"SELECT * FROM {creation_expression} {where_condition} ddd >= toDecimal64('17.21', 3) ORDER BY ALL"
)
== 1
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
0 0
1 0
2 0
3 0
4 0
5 0
6 0
7 0
8 0
9 0
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- Tags: no-fasttest

-- Regression test: PREWHERE on a Parquet file with a column declared in the schema
-- but not present in the actual file. When the missing column appears both in the
-- PREWHERE condition and in the SELECT list, it stays in the format header as a
-- pass-through INPUT node in the prewhere DAG. The `add_prewhere_outputs` lambda
-- incorrectly added such INPUT nodes to `external_columns`, which prevented the
-- SchemaConverter from creating a missing-column entry. This caused "KeyCondition
-- uses PREWHERE output" or "PREWHERE appears to use its own output as input"
-- exceptions (LOGICAL_ERROR).

SET input_format_parquet_use_native_reader_v3 = 1;
SET input_format_parquet_allow_missing_columns = 1;
SET engine_file_truncate_on_insert = 1;

INSERT INTO FUNCTION file(currentDatabase() || '_04001.parquet')
SELECT number FROM numbers(10);

-- `extra` is not in the Parquet file; it defaults to 0 with allow_missing_columns.
-- The SELECT uses both `number` and `extra`, so `extra` remains in format_header as
-- a pass-through. Without the fix, `add_prewhere_outputs` incorrectly added `extra`
-- to `external_columns`, preventing SchemaConverter from creating a missing column.
SELECT number, extra FROM file(currentDatabase() || '_04001.parquet', Parquet, 'number UInt64, extra UInt64')
PREWHERE extra = 0
ORDER BY number;
Loading