Skip to content

Commit aadc433

Browse files
authored
Merge pull request ClickHouse#84745 from ClickHouse/delta-lake-fix-column-pruning
Fix column pruning with delta-kernel
2 parents 0d53a65 + 2337840 commit aadc433

File tree

3 files changed

+106
-11
lines changed

3 files changed

+106
-11
lines changed

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,28 @@ ReadFromFormatInfo DeltaLakeMetadataDeltaKernel::prepareReadingFromFormat(
6363
{
6464
auto info = DB::prepareReadingFromFormat(requested_columns, storage_snapshot, context, supports_subset_of_columns);
6565

66-
info.format_header.clear();
67-
for (const auto & [column_name, column_type] : table_snapshot->getReadSchema())
68-
info.format_header.insert({column_type->createColumn(), column_type, column_name});
69-
7066
/// Read schema is different from table schema in case:
7167
/// 1. we have partition columns (they are not stored in the actual data)
7268
/// 2. columnMapping.mode = 'name' or 'id'.
7369
/// So we add partition columns to read schema and put it together into format_header.
7470
/// Partition values will be added to result data right after data is read.
75-
7671
const auto & physical_names_map = table_snapshot->getPhysicalNamesMap();
72+
const auto read_columns = table_snapshot->getReadSchema().getNameSet();
73+
74+
Block format_header;
75+
for (auto && column_with_type_and_name : info.format_header)
76+
{
77+
auto physical_name = DeltaLake::getPhysicalName(column_with_type_and_name.name, physical_names_map);
78+
if (!read_columns.contains(physical_name))
79+
{
80+
LOG_TEST(log, "Filtering out non-readable column: {}", column_with_type_and_name.name);
81+
continue;
82+
}
83+
column_with_type_and_name.name = physical_name;
84+
format_header.insert(std::move(column_with_type_and_name));
85+
}
86+
info.format_header = std::move(format_header);
87+
7788
/// Update requested columns to reference actual physical column names.
7889
if (!physical_names_map.empty())
7990
{

src/Storages/ObjectStorage/StorageObjectStorageSource.cpp

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -550,15 +550,24 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
550550

551551
builder.init(Pipe(input_format));
552552

553-
std::shared_ptr<const ActionsDAG> transformer;
554-
if (object_info->data_lake_metadata)
555-
transformer = object_info->data_lake_metadata->transform;
553+
std::optional<ActionsDAG> transformer;
554+
if (object_info->data_lake_metadata && object_info->data_lake_metadata->transform)
555+
{
556+
transformer = object_info->data_lake_metadata->transform->clone();
557+
/// FIXME: This is currently not done for the below case (configuration->getSchemaTransformer())
558+
/// because it is an iceberg case where transformer contains columns ids (just increasing numbers)
559+
/// which do not match requested_columns (while here requested_columns were adjusted to match physical columns).
560+
transformer->removeUnusedActions(read_from_format_info.requested_columns.getNames());
561+
}
556562
if (!transformer)
557-
transformer = configuration->getSchemaTransformer(context_, object_info->getPath());
563+
{
564+
if (auto schema_transformer = configuration->getSchemaTransformer(context_, object_info->getPath()))
565+
transformer = schema_transformer->clone();
566+
}
558567

559-
if (transformer)
568+
if (transformer.has_value())
560569
{
561-
auto schema_modifying_actions = std::make_shared<ExpressionActions>(transformer->clone());
570+
auto schema_modifying_actions = std::make_shared<ExpressionActions>(std::move(transformer.value()));
562571
builder.addSimpleTransform([&](const SharedHeader & header)
563572
{
564573
return std::make_shared<ExpressionTransform>(header, schema_modifying_actions);

tests/integration/test_storage_delta/test.py

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2208,3 +2208,78 @@ def test_filtering_by_virtual_columns(started_cluster):
22082208
f"SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log WHERE query_id = '{query_id}' and type = 'QueryFinish'"
22092209
)
22102210
)
2211+
2212+
2213+
def test_column_pruning(started_cluster):
2214+
instance = started_cluster.instances["node1"]
2215+
spark = started_cluster.spark_session
2216+
minio_client = started_cluster.minio_client
2217+
bucket = started_cluster.minio_bucket
2218+
TABLE_NAME = randomize_table_name("test_column_pruning")
2219+
result_file = f"{TABLE_NAME}"
2220+
partition_columns = []
2221+
2222+
schema = StructType(
2223+
[
2224+
StructField("id", IntegerType(), nullable=False),
2225+
StructField("name", StringType(), nullable=False),
2226+
StructField("age", IntegerType(), nullable=False),
2227+
StructField("country", StringType(), nullable=False),
2228+
StructField("year", StringType(), nullable=False),
2229+
]
2230+
)
2231+
2232+
num_rows = 10000
2233+
now = datetime.now()
2234+
data = [
2235+
(i, f"name_{i}", 32, "".join("a" for _ in range(100)), "2025")
2236+
for i in range(num_rows)
2237+
]
2238+
df = spark.createDataFrame(data=data, schema=schema)
2239+
df.printSchema()
2240+
df.write.mode("append").format("delta").partitionBy(partition_columns).save(
2241+
f"/{TABLE_NAME}"
2242+
)
2243+
2244+
minio_client = started_cluster.minio_client
2245+
bucket = started_cluster.minio_bucket
2246+
2247+
files = upload_directory(minio_client, bucket, f"/{TABLE_NAME}", "")
2248+
assert len(files) > 0
2249+
print(f"Uploaded files: {files}")
2250+
2251+
table_function = f"deltaLake('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{result_file}/', 'minio', '{minio_secret_key}')"
2252+
2253+
query_id = f"query_{TABLE_NAME}_1"
2254+
sum = int(
2255+
instance.query(
2256+
f"SELECT sum(id) FROM {table_function} SETTINGS allow_experimental_delta_kernel_rs=0, max_read_buffer_size_remote_fs=100",
2257+
query_id=query_id,
2258+
)
2259+
)
2260+
instance.query("SYSTEM FLUSH LOGS")
2261+
assert 107220 == int(
2262+
instance.query(
2263+
f"SELECT ProfileEvents['ReadBufferFromS3Bytes'] FROM system.query_log WHERE query_id = '{query_id}' and type = 'QueryFinish'"
2264+
)
2265+
)
2266+
2267+
query_id = f"query_{TABLE_NAME}_2"
2268+
assert sum == int(
2269+
instance.query(
2270+
f"SELECT sum(id) FROM {table_function} SETTINGS enable_filesystem_cache=0, max_read_buffer_size_remote_fs=100",
2271+
query_id=query_id,
2272+
)
2273+
)
2274+
instance.query("SYSTEM FLUSH LOGS")
2275+
assert 1 == int(
2276+
instance.query(
2277+
f"SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log WHERE query_id = '{query_id}' and type = 'QueryFinish'"
2278+
)
2279+
)
2280+
# Small diff because in case of delta-kernel metadata reading is not counted in the metric.
2281+
assert 105677 == int(
2282+
instance.query(
2283+
f"SELECT ProfileEvents['ReadBufferFromS3Bytes'] FROM system.query_log WHERE query_id = '{query_id}' and type = 'QueryFinish'"
2284+
)
2285+
)

0 commit comments

Comments
 (0)