Skip to content

Commit 2337840

Browse files
committed
Fix iceberg tests
1 parent 30decd6 commit 2337840

File tree

2 files changed

+19
-12
lines changed

2 files changed

+19
-12
lines changed

src/Storages/ObjectStorage/StorageObjectStorageSource.cpp

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -550,17 +550,24 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
550550

551551
builder.init(Pipe(input_format));
552552

553-
std::shared_ptr<const ActionsDAG> transformer;
554-
if (object_info->data_lake_metadata)
555-
transformer = object_info->data_lake_metadata->transform;
553+
std::optional<ActionsDAG> transformer;
554+
if (object_info->data_lake_metadata && object_info->data_lake_metadata->transform)
555+
{
556+
transformer = object_info->data_lake_metadata->transform->clone();
557+
/// FIXME: This is currently not done for the below case (configuration->getSchemaTransformer())
558+
/// because it is an iceberg case where transformer contains columns ids (just increasing numbers)
559+
/// which do not match requested_columns (while here requested_columns were adjusted to match physical columns).
560+
transformer->removeUnusedActions(read_from_format_info.requested_columns.getNames());
561+
}
556562
if (!transformer)
557-
transformer = configuration->getSchemaTransformer(context_, object_info->getPath());
563+
{
564+
if (auto schema_transformer = configuration->getSchemaTransformer(context_, object_info->getPath()))
565+
transformer = schema_transformer->clone();
566+
}
558567

559-
if (transformer)
568+
if (transformer.has_value())
560569
{
561-
auto transformer_copy = transformer->clone();
562-
transformer_copy.removeUnusedActions(read_from_format_info.requested_columns.getNames());
563-
auto schema_modifying_actions = std::make_shared<ExpressionActions>(std::move(transformer_copy));
570+
auto schema_modifying_actions = std::make_shared<ExpressionActions>(std::move(transformer.value()));
564571
builder.addSimpleTransform([&](const SharedHeader & header)
565572
{
566573
return std::make_shared<ExpressionTransform>(header, schema_modifying_actions);

tests/integration/test_storage_delta/test.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2229,10 +2229,10 @@ def test_column_pruning(started_cluster):
22292229
]
22302230
)
22312231

2232-
num_rows = 100000
2232+
num_rows = 10000
22332233
now = datetime.now()
22342234
data = [
2235-
(i, f"name_{i}", 32, "".join("a" for _ in range(1000)), "2025")
2235+
(i, f"name_{i}", 32, "".join("a" for _ in range(100)), "2025")
22362236
for i in range(num_rows)
22372237
]
22382238
df = spark.createDataFrame(data=data, schema=schema)
@@ -2258,7 +2258,7 @@ def test_column_pruning(started_cluster):
22582258
)
22592259
)
22602260
instance.query("SYSTEM FLUSH LOGS")
2261-
assert 467413 == int(
2261+
assert 107220 == int(
22622262
instance.query(
22632263
f"SELECT ProfileEvents['ReadBufferFromS3Bytes'] FROM system.query_log WHERE query_id = '{query_id}' and type = 'QueryFinish'"
22642264
)
@@ -2278,7 +2278,7 @@ def test_column_pruning(started_cluster):
22782278
)
22792279
)
22802280
# Small diff because in case of delta-kernel metadata reading is not counted in the metric.
2281-
assert 465864 == int(
2281+
assert 105677 == int(
22822282
instance.query(
22832283
f"SELECT ProfileEvents['ReadBufferFromS3Bytes'] FROM system.query_log WHERE query_id = '{query_id}' and type = 'QueryFinish'"
22842284
)

0 commit comments

Comments
 (0)