Skip to content

Commit cf1872f

Browse files
committed
Fix min/max value in Iceberg writes
1 parent 591b199 commit cf1872f

File tree

2 files changed

+21
-5
lines changed

2 files changed

+21
-5
lines changed

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,11 +1232,6 @@ void IcebergStorageSink::consume(Chunk & chunk)
12321232
{
12331233
auto [data_filename, data_filename_in_storage] = filename_generator.generateDataFileName();
12341234
data_filenames[partition_key] = data_filename;
1235-
if (!statistics.contains(partition_key))
1236-
{
1237-
statistics.emplace(partition_key, current_schema->getArray(Iceberg::f_fields));
1238-
}
1239-
statistics.at(partition_key).update(part_chunk);
12401235

12411236
auto buffer = object_storage->writeObject(
12421237
StoredObject(data_filename_in_storage), WriteMode::Rewrite, std::nullopt, DBMS_DEFAULT_BUFFER_SIZE, context->getWriteSettings());
@@ -1252,6 +1247,12 @@ void IcebergStorageSink::consume(Chunk & chunk)
12521247
configuration->getFormat(), *write_buffers[partition_key], *sample_block, context, format_settings);
12531248
}
12541249

1250+
if (!statistics.contains(partition_key))
1251+
{
1252+
statistics.emplace(partition_key, current_schema->getArray(Iceberg::f_fields));
1253+
}
1254+
statistics.at(partition_key).update(part_chunk);
1255+
12551256
writers[partition_key]->write(getHeader().cloneWithColumns(part_chunk.getColumns()));
12561257
}
12571258
}

tests/integration/test_storage_iceberg/test.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3744,3 +3744,18 @@ def test_system_tables_partition_sorting_keys(started_cluster, storage_type):
37443744
""").strip().lower()
37453745

37463746
assert res == '"bucket(16, id), day(ts)","id desc, hour(ts) asc"'
3747+
3748+
3749+
def test_iceberg_write_minmax(started_cluster):
3750+
instance = started_cluster.instances["node1"]
3751+
TABLE_NAME = "test_iceberg_write_minmax_" + get_uuid_str()
3752+
3753+
create_iceberg_table("local", instance, TABLE_NAME, started_cluster, "(x Int32, y Int32)", partition_by="identity(x)")
3754+
3755+
instance.query(f"INSERT INTO {TABLE_NAME} VALUES (1, 1), (1, 2)", settings={"allow_experimental_insert_into_iceberg": 1})
3756+
3757+
res = instance.query(f"SELECT x,y FROM {TABLE_NAME} WHERE y=1 ORDER BY ALL").strip()
3758+
assert res == "1\t1"
3759+
3760+
res = instance.query(f"SELECT x,y FROM {TABLE_NAME} WHERE y=2 ORDER BY ALL").strip()
3761+
assert res == "1\t2"

0 commit comments

Comments
 (0)