Skip to content

Commit 1633469

Browse files
authored
Merge pull request #959 from Altinity/improvement/antalya/874
2 parents 09ab657 + c082b3b commit 1633469

File tree

6 files changed

+305
-10
lines changed

6 files changed

+305
-10
lines changed

src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ class IDataLakeMetadata : boost::noncopyable
6060
virtual std::optional<size_t> totalRows(ContextPtr) const { return {}; }
6161
virtual std::optional<size_t> totalBytes(ContextPtr) const { return {}; }
6262

63+
virtual std::optional<String> partitionKey(ContextPtr) const { return {}; }
64+
virtual std::optional<String> sortingKey(ContextPtr) const { return {}; }
65+
6366
protected:
6467
ObjectIterator createKeysIterator(
6568
Strings && data_files_,

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 206 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,197 @@ bool IcebergMetadata::update(const ContextPtr & local_context)
529529
return previous_snapshot_schema_id != relevant_snapshot_schema_id;
530530
}
531531

532+
namespace
533+
{
534+
535+
using IdToName = std::unordered_map<Int32, String>;
536+
537+
IdToName buildIdToNameMap(const Poco::JSON::Object::Ptr & metadata_obj)
538+
{
539+
IdToName map;
540+
if (!metadata_obj || !metadata_obj->has("current-schema-id") || !metadata_obj->has("schemas"))
541+
return map;
542+
543+
const auto current_schema_id = metadata_obj->getValue<Int32>("current-schema-id");
544+
auto schemas = metadata_obj->getArray("schemas");
545+
if (!schemas)
546+
return map;
547+
548+
for (size_t i = 0; i < schemas->size(); ++i)
549+
{
550+
auto schema = schemas->getObject(i);
551+
552+
if (!schema || !schema->has("schema-id") || (schema->getValue<Int32>("schema-id") != current_schema_id))
553+
continue;
554+
555+
if (auto fields = schema->getArray("fields"))
556+
{
557+
for (size_t j = 0; j < fields->size(); ++j)
558+
{
559+
auto f = fields->getObject(j);
560+
if (!f || !f->has("id") || !f->has("name"))
561+
continue;
562+
map.emplace(f->getValue<Int32>("id"), f->getValue<String>("name"));
563+
}
564+
}
565+
break;
566+
}
567+
return map;
568+
}
569+
570+
String formatTransform(
571+
const String & transform,
572+
const Poco::JSON::Object::Ptr & field_obj,
573+
const IdToName & id_to_name)
574+
{
575+
Int32 source_id = (field_obj && field_obj->has("source-id"))
576+
? field_obj->getValue<Int32>("source-id")
577+
: -1;
578+
579+
const auto it = id_to_name.find(source_id);
580+
const String col = (it != id_to_name.end()) ? it->second : ("col_" + toString(source_id));
581+
582+
String base = transform;
583+
String param;
584+
if (const auto lpos = transform.find('['); lpos != String::npos && transform.back() == ']')
585+
{
586+
base = transform.substr(0, lpos);
587+
param = transform.substr(lpos + 1, transform.size() - lpos - 2); // strip [ and ]
588+
}
589+
590+
String result;
591+
if (base == "identity")
592+
result = col;
593+
else if (base == "year" || base == "month" || base == "day" || base == "hour")
594+
result = base + "(" + col + ")";
595+
else if (base != "void")
596+
{
597+
if (!param.empty())
598+
result = base + "(" + param + ", " + col + ")";
599+
else
600+
result = base + "(" + col + ")";
601+
}
602+
return result;
603+
}
604+
605+
Poco::JSON::Array::Ptr findActivePartitionFields(const Poco::JSON::Object::Ptr & metadata_obj)
606+
{
607+
if (!metadata_obj)
608+
return nullptr;
609+
610+
if (metadata_obj->has("partition-spec"))
611+
return metadata_obj->getArray("partition-spec");
612+
613+
// If for some reason there is no partition-spec, try partition-specs + default-
614+
if (metadata_obj->has("partition-specs") && metadata_obj->has("default-spec-id"))
615+
{
616+
const auto default_spec_id = metadata_obj->getValue<Int32>("default-spec-id");
617+
if (auto specs = metadata_obj->getArray("partition-specs"))
618+
{
619+
for (size_t i = 0; i < specs->size(); ++i)
620+
{
621+
auto spec = specs->getObject(i);
622+
if (!spec || !spec->has("spec-id"))
623+
continue;
624+
if (spec->getValue<Int32>("spec-id") == default_spec_id)
625+
return spec->has("fields") ? spec->getArray("fields") : nullptr;
626+
}
627+
}
628+
}
629+
630+
return nullptr;
631+
}
632+
633+
Poco::JSON::Array::Ptr findActiveSortFields(const Poco::JSON::Object::Ptr & metadata_obj)
634+
{
635+
if (!metadata_obj || !metadata_obj->has("default-sort-order-id") || !metadata_obj->has("sort-orders"))
636+
return nullptr;
637+
638+
const auto default_sort_order_id = metadata_obj->getValue<Int32>("default-sort-order-id");
639+
auto orders = metadata_obj->getArray("sort-orders");
640+
if (!orders)
641+
return nullptr;
642+
643+
for (size_t i = 0; i < orders->size(); ++i)
644+
{
645+
auto order = orders->getObject(i);
646+
if (!order || !order->has("order-id"))
647+
continue;
648+
if (order->getValue<Int32>("order-id") == default_sort_order_id)
649+
return order->has("fields") ? order->getArray("fields") : nullptr;
650+
}
651+
return nullptr;
652+
}
653+
654+
String composeList(
655+
const Poco::JSON::Array::Ptr & fields,
656+
const IdToName & id_to_name,
657+
bool lookup_sort_modifiers)
658+
{
659+
if (!fields || fields->size() == 0)
660+
return {};
661+
662+
Strings parts;
663+
parts.reserve(fields->size());
664+
665+
for (size_t i = 0; i < fields->size(); ++i)
666+
{
667+
auto field = fields->getObject(i);
668+
if (!field)
669+
continue;
670+
671+
const String transform = field->has("transform") ? field->getValue<String>("transform") : "identity";
672+
String expr = formatTransform(transform, field, id_to_name);
673+
if (expr.empty())
674+
continue;
675+
676+
if (lookup_sort_modifiers)
677+
{
678+
if (field->has("direction"))
679+
{
680+
auto d = field->getValue<String>("direction");
681+
expr += (Poco::icompare(d, "desc") == 0) ? "DESC" : "ASC";
682+
}
683+
if (field->has("null-order"))
684+
{
685+
auto n = field->getValue<String>("null-order");
686+
expr += (Poco::icompare(n, "nulls-last") == 0) ? "NULLS LAST" : "NULLS FIRST";
687+
}
688+
}
689+
690+
parts.push_back(std::move(expr));
691+
}
692+
693+
if (parts.empty())
694+
return {};
695+
696+
String res;
697+
for (size_t i = 0; i < parts.size(); ++i)
698+
{
699+
if (i) res += ", ";
700+
res += parts[i];
701+
}
702+
return res;
703+
}
704+
705+
std::pair<std::optional<String>, std::optional<String>> extractIcebergKeys(const Poco::JSON::Object::Ptr & metadata_obj)
706+
{
707+
std::optional<String> partition_key;
708+
std::optional<String> sort_key;
709+
710+
if (metadata_obj)
711+
{
712+
auto id_to_name = buildIdToNameMap(metadata_obj);
713+
714+
partition_key = composeList(findActivePartitionFields(metadata_obj), id_to_name, /*lookup_sort_modifiers=*/ false);
715+
sort_key = composeList(findActiveSortFields(metadata_obj), id_to_name, /*lookup_sort_modifiers=*/ true);
716+
}
717+
718+
return {partition_key, sort_key};
719+
}
720+
721+
}
722+
532723
void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Object::Ptr metadata_object)
533724
{
534725
auto configuration_ptr = configuration.lock();
@@ -563,10 +754,11 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec
563754
total_bytes = summary_object->getValue<Int64>(f_total_files_size);
564755
}
565756

757+
auto [partition_key, sorting_key] = extractIcebergKeys(metadata_object);
566758
relevant_snapshot = IcebergSnapshot{
567759
getManifestList(local_context, getProperFilePathFromMetadataInfo(
568760
snapshot->getValue<String>(f_manifest_list), configuration_ptr->getPathForRead().path, table_location, configuration_ptr->getNamespace())),
569-
relevant_snapshot_id, total_rows, total_bytes};
761+
relevant_snapshot_id, total_rows, total_bytes, partition_key, sorting_key};
570762

571763
if (!snapshot->has(f_schema_id))
572764
throw Exception(
@@ -1011,6 +1203,19 @@ std::optional<size_t> IcebergMetadata::totalBytes(ContextPtr local_context) cons
10111203
return result;
10121204
}
10131205

1206+
std::optional<String> IcebergMetadata::partitionKey(ContextPtr) const
1207+
{
1208+
SharedLockGuard lock(mutex);
1209+
return relevant_snapshot->partition_key;
1210+
}
1211+
1212+
std::optional<String> IcebergMetadata::sortingKey(ContextPtr) const
1213+
{
1214+
SharedLockGuard lock(mutex);
1215+
return relevant_snapshot->sorting_key;
1216+
}
1217+
1218+
10141219
ObjectIterator IcebergMetadata::iterate(
10151220
const ActionsDAG * filter_dag,
10161221
FileProgressCallback callback,

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ class IcebergMetadata : public IDataLakeMetadata, private WithContext
8080
std::optional<size_t> totalRows(ContextPtr Local_context) const override;
8181
std::optional<size_t> totalBytes(ContextPtr Local_context) const override;
8282

83+
std::optional<String> partitionKey(ContextPtr) const override;
84+
std::optional<String> sortingKey(ContextPtr) const override;
85+
8386
protected:
8487
ObjectIterator iterate(
8588
const ActionsDAG * filter_dag,

src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ struct IcebergSnapshot
1616
Int64 snapshot_id;
1717
std::optional<size_t> total_rows;
1818
std::optional<size_t> total_bytes;
19+
std::optional<String> partition_key;
20+
std::optional<String> sorting_key;
1921
};
2022

2123
struct IcebergHistoryRecord

src/Storages/System/StorageSystemTables.cpp

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
#include <QueryPipeline/Pipe.h>
2323
#include <QueryPipeline/QueryPipelineBuilder.h>
2424
#include <Storages/MergeTree/MergeTreeData.h>
25+
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>
26+
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
2527
#include <Storages/SelectQueryInfo.h>
2628
#include <Storages/StorageView.h>
2729
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
@@ -595,18 +597,54 @@ class TablesBlockSource : public ISource
595597
ASTPtr expression_ptr;
596598
if (columns_mask[src_index++])
597599
{
598-
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getPartitionKeyAST()))
599-
res_columns[res_index++]->insert(format({context, *expression_ptr}));
600-
else
601-
res_columns[res_index++]->insertDefault();
600+
bool inserted = false;
601+
// Extract from specific DataLake metadata if suitable
602+
if (auto * obj = dynamic_cast<StorageObjectStorageCluster *>(table.get()))
603+
{
604+
if (auto * dl_meta = obj->getExternalMetadata(context))
605+
{
606+
if (auto p = dl_meta->partitionKey(context); p.has_value())
607+
{
608+
res_columns[res_index++]->insert(*p);
609+
inserted = true;
610+
}
611+
}
612+
613+
}
614+
615+
if (!inserted)
616+
{
617+
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getPartitionKeyAST()))
618+
res_columns[res_index++]->insert(format({context, *expression_ptr}));
619+
else
620+
res_columns[res_index++]->insertDefault();
621+
}
602622
}
603623

604624
if (columns_mask[src_index++])
605625
{
606-
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getSortingKey().expression_list_ast))
607-
res_columns[res_index++]->insert(format({context, *expression_ptr}));
608-
else
609-
res_columns[res_index++]->insertDefault();
626+
bool inserted = false;
627+
628+
// Extract from specific DataLake metadata if suitable
629+
if (auto * obj = dynamic_cast<StorageObjectStorageCluster *>(table.get()))
630+
{
631+
if (auto * dl_meta = obj->getExternalMetadata(context))
632+
{
633+
if (auto p = dl_meta->sortingKey(context); p.has_value())
634+
{
635+
res_columns[res_index++]->insert(*p);
636+
inserted = true;
637+
}
638+
}
639+
}
640+
641+
if (!inserted)
642+
{
643+
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getSortingKey().expression_list_ast))
644+
res_columns[res_index++]->insert(format({context, *expression_ptr}));
645+
else
646+
res_columns[res_index++]->insertDefault();
647+
}
610648
}
611649

612650
if (columns_mask[src_index++])

tests/integration/test_storage_iceberg/test.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3410,6 +3410,50 @@ def execute_spark_query(query: str):
34103410

34113411
instance.query(f"SELECT * FROM {table_select_expression} ORDER BY ALL")
34123412

3413+
@pytest.mark.parametrize("storage_type", ["s3"])
3414+
def test_system_tables_partition_sorting_keys(started_cluster, storage_type):
3415+
instance = started_cluster.instances["node1"]
3416+
spark = started_cluster.spark_session
3417+
3418+
table_name = f"test_sys_tables_keys_{storage_type}_{uuid.uuid4().hex[:8]}"
3419+
fq_table = f"spark_catalog.default.{table_name}"
3420+
3421+
spark.sql(f"DROP TABLE IF EXISTS {fq_table}")
3422+
spark.sql(f"""
3423+
CREATE TABLE {fq_table} (
3424+
id INT,
3425+
ts TIMESTAMP,
3426+
payload STRING
3427+
)
3428+
USING iceberg
3429+
PARTITIONED BY (bucket(16, id), day(ts))
3430+
TBLPROPERTIES ('format-version' = '2')
3431+
""")
3432+
spark.sql(f"ALTER TABLE {fq_table} WRITE ORDERED BY (id DESC NULLS LAST, hour(ts))")
3433+
spark.sql(f"""
3434+
INSERT INTO {fq_table} VALUES
3435+
(1, timestamp'2024-01-01 10:00:00', 'a'),
3436+
(2, timestamp'2024-01-02 11:00:00', 'b'),
3437+
(NULL, timestamp'2024-01-03 12:00:00', 'c')
3438+
""")
3439+
3440+
time.sleep(2)
3441+
default_upload_directory(
3442+
started_cluster,
3443+
storage_type,
3444+
f"/iceberg_data/default/{table_name}/",
3445+
f"/iceberg_data/default/{table_name}/",
3446+
)
3447+
3448+
create_iceberg_table(storage_type, instance, table_name, started_cluster)
3449+
3450+
res = instance.query(f"""
3451+
SELECT partition_key, sorting_key
3452+
FROM system.tables
3453+
WHERE name = '{table_name}' FORMAT csv
3454+
""").strip().lower()
3455+
3456+
assert res == '"bucket(16, id), day(ts)","iddescnulls last, hour(ts)ascnulls first"'
34133457

34143458
@pytest.mark.parametrize("storage_type", ["local", "s3"])
34153459
def test_compressed_metadata(started_cluster, storage_type):
@@ -3447,4 +3491,4 @@ def test_compressed_metadata(started_cluster, storage_type):
34473491

34483492
create_iceberg_table(storage_type, instance, TABLE_NAME, started_cluster, explicit_metadata_path="")
34493493

3450-
assert instance.query(f"SELECT * FROM {TABLE_NAME} WHERE not ignore(*)") == "1\tAlice\n2\tBob\n"
3494+
assert instance.query(f"SELECT * FROM {TABLE_NAME} WHERE not ignore(*)") == "1\tAlice\n2\tBob\n"

0 commit comments

Comments
 (0)