Skip to content

Commit 3b85aca

Browse files
authored
Merge pull request #1095 from Altinity/ports/25.8/959_iceberg_addtional_columns_in_system_tables
Antalya 25.8: Port of #959 and #1026 iceberg addtional columns in system tables
2 parents 852e5f5 + b6b6e61 commit 3b85aca

File tree

6 files changed

+309
-15
lines changed

6 files changed

+309
-15
lines changed

src/Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,9 @@ class IDataLakeMetadata : boost::noncopyable
166166
virtual void checkAlterIsPossible(const AlterCommands & /*commands*/) { throwNotImplemented("alter"); }
167167
virtual void alter(const AlterCommands & /*params*/, ContextPtr /*context*/) { throwNotImplemented("alter"); }
168168

169+
virtual std::optional<String> partitionKey(ContextPtr) const { return {}; }
170+
virtual std::optional<String> sortingKey(ContextPtr) const { return {}; }
171+
169172
protected:
170173
virtual ObjectIterator createKeysIterator(
171174
Strings && data_files_,

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.cpp

Lines changed: 203 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,192 @@ bool IcebergMetadata::update(const ContextPtr & local_context)
258258
return previous_snapshot_schema_id != relevant_snapshot_schema_id;
259259
}
260260

261+
namespace
262+
{
263+
264+
using IdToName = std::unordered_map<Int32, String>;
265+
266+
IdToName buildIdToNameMap(const Poco::JSON::Object::Ptr & metadata_obj)
267+
{
268+
IdToName map;
269+
if (!metadata_obj || !metadata_obj->has("current-schema-id") || !metadata_obj->has("schemas"))
270+
return map;
271+
272+
const auto current_schema_id = metadata_obj->getValue<Int32>("current-schema-id");
273+
auto schemas = metadata_obj->getArray("schemas");
274+
if (!schemas)
275+
return map;
276+
277+
for (size_t i = 0; i < schemas->size(); ++i)
278+
{
279+
auto schema = schemas->getObject(i);
280+
281+
if (!schema || !schema->has("schema-id") || (schema->getValue<Int32>("schema-id") != current_schema_id))
282+
continue;
283+
284+
if (auto fields = schema->getArray("fields"))
285+
{
286+
for (size_t j = 0; j < fields->size(); ++j)
287+
{
288+
auto f = fields->getObject(j);
289+
if (!f || !f->has("id") || !f->has("name"))
290+
continue;
291+
map.emplace(f->getValue<Int32>("id"), f->getValue<String>("name"));
292+
}
293+
}
294+
break;
295+
}
296+
return map;
297+
}
298+
299+
String formatTransform(
300+
const String & transform,
301+
const Poco::JSON::Object::Ptr & field_obj,
302+
const IdToName & id_to_name)
303+
{
304+
Int32 source_id = (field_obj && field_obj->has("source-id"))
305+
? field_obj->getValue<Int32>("source-id")
306+
: -1;
307+
308+
const auto it = id_to_name.find(source_id);
309+
const String col = (it != id_to_name.end()) ? it->second : ("col_" + toString(source_id));
310+
311+
String base = transform;
312+
String param;
313+
if (const auto lpos = transform.find('['); lpos != String::npos && transform.back() == ']')
314+
{
315+
base = transform.substr(0, lpos);
316+
param = transform.substr(lpos + 1, transform.size() - lpos - 2); // strip [ and ]
317+
}
318+
319+
String result;
320+
if (base == "identity")
321+
result = col;
322+
else if (base == "year" || base == "month" || base == "day" || base == "hour")
323+
result = base + "(" + col + ")";
324+
else if (base != "void")
325+
{
326+
if (!param.empty())
327+
result = base + "(" + param + ", " + col + ")";
328+
else
329+
result = base + "(" + col + ")";
330+
}
331+
return result;
332+
}
333+
334+
Poco::JSON::Array::Ptr findActivePartitionFields(const Poco::JSON::Object::Ptr & metadata_obj)
335+
{
336+
if (!metadata_obj)
337+
return nullptr;
338+
339+
if (metadata_obj->has("partition-spec"))
340+
return metadata_obj->getArray("partition-spec");
341+
342+
// If for some reason there is no partition-spec, try partition-specs + default-
343+
if (metadata_obj->has("partition-specs") && metadata_obj->has("default-spec-id"))
344+
{
345+
const auto default_spec_id = metadata_obj->getValue<Int32>("default-spec-id");
346+
if (auto specs = metadata_obj->getArray("partition-specs"))
347+
{
348+
for (size_t i = 0; i < specs->size(); ++i)
349+
{
350+
auto spec = specs->getObject(i);
351+
if (!spec || !spec->has("spec-id"))
352+
continue;
353+
if (spec->getValue<Int32>("spec-id") == default_spec_id)
354+
return spec->has("fields") ? spec->getArray("fields") : nullptr;
355+
}
356+
}
357+
}
358+
359+
return nullptr;
360+
}
361+
362+
Poco::JSON::Array::Ptr findActiveSortFields(const Poco::JSON::Object::Ptr & metadata_obj)
363+
{
364+
if (!metadata_obj || !metadata_obj->has("default-sort-order-id") || !metadata_obj->has("sort-orders"))
365+
return nullptr;
366+
367+
const auto default_sort_order_id = metadata_obj->getValue<Int32>("default-sort-order-id");
368+
auto orders = metadata_obj->getArray("sort-orders");
369+
if (!orders)
370+
return nullptr;
371+
372+
for (size_t i = 0; i < orders->size(); ++i)
373+
{
374+
auto order = orders->getObject(i);
375+
if (!order || !order->has("order-id"))
376+
continue;
377+
if (order->getValue<Int32>("order-id") == default_sort_order_id)
378+
return order->has("fields") ? order->getArray("fields") : nullptr;
379+
}
380+
return nullptr;
381+
}
382+
383+
String composeList(
384+
const Poco::JSON::Array::Ptr & fields,
385+
const IdToName & id_to_name,
386+
bool lookup_sort_modifiers)
387+
{
388+
if (!fields || fields->size() == 0)
389+
return {};
390+
391+
Strings parts;
392+
parts.reserve(fields->size());
393+
394+
for (size_t i = 0; i < fields->size(); ++i)
395+
{
396+
auto field = fields->getObject(i);
397+
if (!field)
398+
continue;
399+
400+
const String transform = field->has("transform") ? field->getValue<String>("transform") : "identity";
401+
String expr = formatTransform(transform, field, id_to_name);
402+
if (expr.empty())
403+
continue;
404+
405+
if (lookup_sort_modifiers)
406+
{
407+
if (field->has("direction"))
408+
{
409+
auto d = field->getValue<String>("direction");
410+
expr += (Poco::icompare(d, "desc") == 0) ? " DESC" : " ASC";
411+
}
412+
}
413+
414+
parts.push_back(std::move(expr));
415+
}
416+
417+
if (parts.empty())
418+
return {};
419+
420+
String res;
421+
for (size_t i = 0; i < parts.size(); ++i)
422+
{
423+
if (i) res += ", ";
424+
res += parts[i];
425+
}
426+
return res;
427+
}
428+
429+
std::pair<std::optional<String>, std::optional<String>> extractIcebergKeys(const Poco::JSON::Object::Ptr & metadata_obj)
430+
{
431+
std::optional<String> partition_key;
432+
std::optional<String> sort_key;
433+
434+
if (metadata_obj)
435+
{
436+
auto id_to_name = buildIdToNameMap(metadata_obj);
437+
438+
partition_key = composeList(findActivePartitionFields(metadata_obj), id_to_name, /*lookup_sort_modifiers=*/ false);
439+
sort_key = composeList(findActiveSortFields(metadata_obj), id_to_name, /*lookup_sort_modifiers=*/ true);
440+
}
441+
442+
return {partition_key, sort_key};
443+
}
444+
445+
}
446+
261447
void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Object::Ptr metadata_object)
262448
{
263449
auto configuration_ptr = configuration.lock();
@@ -309,6 +495,7 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec
309495
}
310496
}
311497

498+
auto [partition_key, sorting_key] = extractIcebergKeys(metadata_object);
312499
relevant_snapshot = std::make_shared<IcebergDataSnapshot>(
313500
getManifestList(
314501
object_storage,
@@ -324,7 +511,9 @@ void IcebergMetadata::updateSnapshot(ContextPtr local_context, Poco::JSON::Objec
324511
relevant_snapshot_id,
325512
total_rows,
326513
total_bytes,
327-
total_position_deletes);
514+
total_position_deletes,
515+
partition_key,
516+
sorting_key);
328517

329518
if (!snapshot->has(f_schema_id))
330519
throw Exception(
@@ -777,6 +966,19 @@ std::optional<size_t> IcebergMetadata::totalBytes(ContextPtr local_context) cons
777966
return result;
778967
}
779968

969+
std::optional<String> IcebergMetadata::partitionKey(ContextPtr) const
970+
{
971+
SharedLockGuard lock(mutex);
972+
return relevant_snapshot->partition_key;
973+
}
974+
975+
std::optional<String> IcebergMetadata::sortingKey(ContextPtr) const
976+
{
977+
SharedLockGuard lock(mutex);
978+
return relevant_snapshot->sorting_key;
979+
}
980+
981+
780982
ObjectIterator IcebergMetadata::iterate(
781983
const ActionsDAG * filter_dag,
782984
FileProgressCallback callback,

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ class IcebergMetadata : public IDataLakeMetadata
121121
void checkAlterIsPossible(const AlterCommands & commands) override;
122122
void alter(const AlterCommands & params, ContextPtr context) override;
123123

124+
std::optional<String> partitionKey(ContextPtr) const override;
125+
std::optional<String> sortingKey(ContextPtr) const override;
126+
124127
protected:
125128
ObjectIterator
126129
iterate(const ActionsDAG * filter_dag, FileProgressCallback callback, size_t list_batch_size, ContextPtr local_context) const override;

src/Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ struct IcebergDataSnapshot
1717
std::optional<size_t> total_rows;
1818
std::optional<size_t> total_bytes;
1919
std::optional<size_t> total_position_delete_rows;
20+
std::optional<String> partition_key;
21+
std::optional<String> sorting_key;
2022

2123
std::optional<size_t> getTotalRows() const
2224
{

src/Storages/System/StorageSystemTables.cpp

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
#include <QueryPipeline/Pipe.h>
2323
#include <QueryPipeline/QueryPipelineBuilder.h>
2424
#include <Storages/MergeTree/MergeTreeData.h>
25+
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>
26+
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
2527
#include <Storages/SelectQueryInfo.h>
2628
#include <Storages/StorageView.h>
2729
#include <Storages/System/getQueriedColumnsMaskAndHeader.h>
@@ -604,18 +606,54 @@ class TablesBlockSource : public ISource
604606
ASTPtr expression_ptr;
605607
if (columns_mask[src_index++])
606608
{
607-
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getPartitionKeyAST()))
608-
res_columns[res_index++]->insert(format({context, *expression_ptr}));
609-
else
610-
res_columns[res_index++]->insertDefault();
609+
bool inserted = false;
610+
// Extract from specific DataLake metadata if suitable
611+
if (auto * obj = dynamic_cast<StorageObjectStorageCluster *>(table.get()))
612+
{
613+
if (auto * dl_meta = obj->getExternalMetadata(context))
614+
{
615+
if (auto p = dl_meta->partitionKey(context); p.has_value())
616+
{
617+
res_columns[res_index++]->insert(*p);
618+
inserted = true;
619+
}
620+
}
621+
622+
}
623+
624+
if (!inserted)
625+
{
626+
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getPartitionKeyAST()))
627+
res_columns[res_index++]->insert(format({context, *expression_ptr}));
628+
else
629+
res_columns[res_index++]->insertDefault();
630+
}
611631
}
612632

613633
if (columns_mask[src_index++])
614634
{
615-
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getSortingKey().expression_list_ast))
616-
res_columns[res_index++]->insert(format({context, *expression_ptr}));
617-
else
618-
res_columns[res_index++]->insertDefault();
635+
bool inserted = false;
636+
637+
// Extract from specific DataLake metadata if suitable
638+
if (auto * obj = dynamic_cast<StorageObjectStorageCluster *>(table.get()))
639+
{
640+
if (auto * dl_meta = obj->getExternalMetadata(context))
641+
{
642+
if (auto p = dl_meta->sortingKey(context); p.has_value())
643+
{
644+
res_columns[res_index++]->insert(*p);
645+
inserted = true;
646+
}
647+
}
648+
}
649+
650+
if (!inserted)
651+
{
652+
if (metadata_snapshot && (expression_ptr = metadata_snapshot->getSortingKey().expression_list_ast))
653+
res_columns[res_index++]->insert(format({context, *expression_ptr}));
654+
else
655+
res_columns[res_index++]->insertDefault();
656+
}
619657
}
620658

621659
if (columns_mask[src_index++])

0 commit comments

Comments
 (0)