|
| 1 | +#include <Storages/System/StorageSystemIcebergHistory.h> |
| 2 | +#include <mutex> |
| 3 | +#include <DataTypes/DataTypesNumber.h> |
| 4 | +#include <DataTypes/DataTypeString.h> |
| 5 | +#include <DataTypes/DataTypeMap.h> |
| 6 | +#include <DataTypes/DataTypeDateTime.h> |
| 7 | +#include <DataTypes/DataTypeDate.h> |
| 8 | +#include <DataTypes/DataTypeUUID.h> |
| 9 | +#include <DataTypes/DataTypeNullable.h> |
| 10 | +#include <DataTypes/DataTypeDateTime64.h> |
| 11 | +#include <Interpreters/InterpreterSelectQuery.h> |
| 12 | +#include <Processors/LimitTransform.h> |
| 13 | +#include <Processors/Port.h> |
| 14 | +#include <Processors/QueryPlan/QueryPlan.h> |
| 15 | +#include <Processors/QueryPlan/ReadFromSystemNumbersStep.h> |
| 16 | +#include <Storages/SelectQueryInfo.h> |
| 17 | +#include <Storages/ObjectStorage/StorageObjectStorage.h> |
| 18 | +#include <Access/ContextAccess.h> |
| 19 | +#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h> |
| 20 | +#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadata.h> |
| 21 | +#include <Interpreters/DatabaseCatalog.h> |
| 22 | +#include <Core/Settings.h> |
| 23 | + |
| 24 | +static constexpr auto TIME_SCALE = 6; |
| 25 | + |
| 26 | +namespace DB |
| 27 | +{ |
| 28 | + |
| 29 | +namespace Setting |
| 30 | +{ |
| 31 | + extern const SettingsSeconds lock_acquire_timeout; |
| 32 | +} |
| 33 | + |
| 34 | +ColumnsDescription StorageSystemIcebergHistory::getColumnsDescription() |
| 35 | +{ |
| 36 | + return ColumnsDescription |
| 37 | + { |
| 38 | + {"database_name",std::make_shared<DataTypeString>(),"Database name"}, |
| 39 | + {"table_name",std::make_shared<DataTypeString>(),"Table name."}, |
| 40 | + {"made_current_at",std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime64>(TIME_SCALE)),"date & time when this snapshot was made current snapshot"}, |
| 41 | + {"snapshot_id",std::make_shared<DataTypeUInt64>(),"snapshot id which is used to identify a snapshot."}, |
| 42 | + {"parent_id",std::make_shared<DataTypeUInt64>(),"parent id of this snapshot."}, |
| 43 | + {"is_current_ancestor",std::make_shared<DataTypeUInt8>(),"Flag that indicates if this snapshot is an ancestor of the current snapshot."} |
| 44 | + }; |
| 45 | +} |
| 46 | + |
| 47 | +void StorageSystemIcebergHistory::fillData([[maybe_unused]] MutableColumns & res_columns, [[maybe_unused]] ContextPtr context, const ActionsDAG::Node *, std::vector<UInt8>) const |
| 48 | +{ |
| 49 | +#if USE_AVRO |
| 50 | + const auto access = context->getAccess(); |
| 51 | + |
| 52 | + auto add_history_record = [&](const DatabaseTablesIteratorPtr & it, StorageObjectStorage * object_storage) |
| 53 | + { |
| 54 | + if (!access->isGranted(AccessType::SHOW_TABLES, it->databaseName(), it->name())) |
| 55 | + { |
| 56 | + return; |
| 57 | + } |
| 58 | + |
| 59 | + auto * current_metadata = object_storage->getExternalMetadata(); |
| 60 | + |
| 61 | + if (current_metadata && dynamic_cast<IcebergMetadata *>(current_metadata)) |
| 62 | + { |
| 63 | + auto * iceberg_metadata = dynamic_cast<IcebergMetadata *>(current_metadata); |
| 64 | + IcebergMetadata::IcebergHistory iceberg_history_items = iceberg_metadata->getHistory(); |
| 65 | + |
| 66 | + for (auto & iceberg_history_item : iceberg_history_items) |
| 67 | + { |
| 68 | + size_t column_index = 0; |
| 69 | + res_columns[column_index++]->insert(it->databaseName()); |
| 70 | + res_columns[column_index++]->insert(it->name()); |
| 71 | + res_columns[column_index++]->insert(iceberg_history_item.made_current_at); |
| 72 | + res_columns[column_index++]->insert(iceberg_history_item.snapshot_id); |
| 73 | + res_columns[column_index++]->insert(iceberg_history_item.parent_id); |
| 74 | + res_columns[column_index++]->insert(iceberg_history_item.is_current_ancestor); |
| 75 | + } |
| 76 | + } |
| 77 | + }; |
| 78 | + |
| 79 | + const bool show_tables_granted = access->isGranted(AccessType::SHOW_TABLES); |
| 80 | + |
| 81 | + if (show_tables_granted) |
| 82 | + { |
| 83 | + auto databases = DatabaseCatalog::instance().getDatabases(); |
| 84 | + for (const auto &db: databases) |
| 85 | + { |
| 86 | + for (auto iterator = db.second->getLightweightTablesIterator(context); iterator->isValid(); iterator->next()) |
| 87 | + { |
| 88 | + StoragePtr storage = iterator->table(); |
| 89 | + |
| 90 | + TableLockHolder lock = storage->tryLockForShare(context->getCurrentQueryId(), context->getSettingsRef()[Setting::lock_acquire_timeout]); |
| 91 | + if (!lock) |
| 92 | + // Table was dropped while acquiring the lock, skipping table |
| 93 | + continue; |
| 94 | + |
| 95 | + if (auto *object_storage_table = dynamic_cast<StorageObjectStorage *>(storage.get())) |
| 96 | + { |
| 97 | + add_history_record(iterator, object_storage_table); |
| 98 | + } |
| 99 | + } |
| 100 | + } |
| 101 | + } |
| 102 | +#endif |
| 103 | +} |
| 104 | +} |
0 commit comments