forked from ClickHouse/ClickHouse
-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathIcebergMetadata.h
More file actions
178 lines (138 loc) · 8.01 KB
/
IcebergMetadata.h
File metadata and controls
178 lines (138 loc) · 8.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#pragma once
#include "config.h"
#if USE_AVRO
#include <Poco/JSON/Array.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Core/Types.h>
#include <Disks/DiskObjectStorage/ObjectStorages/IObjectStorage.h>
#include <Interpreters/Context_fwd.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Snapshot.h>
#include <optional>
#include <base/defines.h>
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergMetadataFilesCache.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <IO/CompressionMethod.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergDataObjectInfo.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergIterator.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/IcebergTableStateSnapshot.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/PersistentTableComponents.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/StatelessMetadataFileGetter.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/Utils.h>
#include <Storages/ObjectStorage/Utils.h>
namespace DB
{
class IcebergMetadata : public IDataLakeMetadata
{
public:
using IcebergHistory = std::vector<Iceberg::IcebergHistoryRecord>;
static constexpr auto name = "Iceberg";
const char * getName() const override { return name; }
IcebergMetadata(
ObjectStoragePtr object_storage_,
StorageObjectStorageConfigurationPtr configuration_,
const ContextPtr & context_,
IcebergMetadataFilesCachePtr cache_ptr);
/// Get table schema parsed from metadata.
NamesAndTypesList getTableSchema(ContextPtr local_context) const override;
StorageInMemoryMetadata getStorageSnapshotMetadata(ContextPtr local_context) const override;
bool operator==(const IDataLakeMetadata & /*other*/) const override { return false; }
static void createInitial(
const ObjectStoragePtr & object_storage,
const StorageObjectStorageConfigurationWeakPtr & configuration,
const ContextPtr & local_context,
const std::optional<ColumnsDescription> & columns,
ASTPtr partition_by,
ASTPtr order_by,
bool if_not_exists,
std::shared_ptr<DataLake::ICatalog> catalog,
const StorageID & table_id_);
static DataLakeMetadataPtr create(
const ObjectStoragePtr & object_storage,
const StorageObjectStorageConfigurationWeakPtr & configuration,
const ContextPtr & local_context);
std::shared_ptr<NamesAndTypesList> getInitialSchemaByPath(ContextPtr local_context, ObjectInfoPtr object_info) const override;
std::shared_ptr<const ActionsDAG> getSchemaTransformer(ContextPtr local_context, ObjectInfoPtr object_info) const override;
static Int32 parseTableSchema(
const Poco::JSON::Object::Ptr & metadata_object,
Iceberg::IcebergSchemaProcessor & schema_processor,
ContextPtr context_,
LoggerPtr metadata_logger);
bool supportsUpdate() const override { return true; }
bool supportsWrites() const override { return true; }
bool supportsParallelInsert() const override { return true; }
bool supportsTruncate() const override { return true; }
void truncate(ContextPtr context, std::shared_ptr<DataLake::ICatalog> catalog, const StorageID & storage_id) override;
IcebergHistory getHistory(ContextPtr local_context) const;
static constexpr bool supportsTotalRows() { return true; }
std::optional<size_t> totalRows(ContextPtr Local_context) const override;
static constexpr bool supportsTotalBytes() { return true; }
std::optional<size_t> totalBytes(ContextPtr Local_context) const override;
bool isDataSortedBySortingKey(StorageMetadataPtr storage_metadata_snapshot, ContextPtr context) const override;
ColumnMapperPtr getColumnMapperForObject(ObjectInfoPtr object_info) const override;
ColumnMapperPtr getColumnMapperForCurrentSchema(StorageMetadataPtr storage_metadata_snapshot, ContextPtr context) const override;
SinkToStoragePtr write(
SharedHeader sample_block,
const StorageID & table_id,
ObjectStoragePtr object_storage,
StorageObjectStorageConfigurationPtr /*configuration*/,
const std::optional<FormatSettings> & format_settings,
ContextPtr context,
std::shared_ptr<DataLake::ICatalog> catalog) override;
CompressionMethod getCompressionMethod() const { return persistent_components.metadata_compression_method; }
bool optimize(const StorageMetadataPtr & metadata_snapshot, ContextPtr context, const std::optional<FormatSettings> & format_settings) override;
bool supportsDelete() const override { return true; }
void mutate(
const MutationCommands & commands,
StorageObjectStorageConfigurationPtr configuration,
ContextPtr context,
const StorageID & storage_id,
StorageMetadataPtr metadata_snapshot,
std::shared_ptr<DataLake::ICatalog> catalog,
const std::optional<FormatSettings> & format_settings) override;
void checkMutationIsPossible(const MutationCommands & commands) override;
void modifyFormatSettings(FormatSettings & format_settings, const Context & local_context) const override;
void addDeleteTransformers(ObjectInfoPtr object_info, QueryPipelineBuilder & builder, const std::optional<FormatSettings> & format_settings, FormatParserSharedResourcesPtr parser_shared_resources, ContextPtr local_context) const override;
void checkAlterIsPossible(const AlterCommands & commands) override;
void alter(const AlterCommands & params, ContextPtr context) override;
ObjectIterator iterate(
const ActionsDAG * filter_dag,
FileProgressCallback callback,
size_t list_batch_size,
StorageMetadataPtr storage_metadata,
ContextPtr local_context) const override;
void drop(ContextPtr context) override;
ObjectIterator createIcebergKeysIterator(
Strings && data_files_,
ObjectStoragePtr,
IDataLakeMetadata::FileProgressCallback callback_,
ContextPtr local_context);
std::optional<String> partitionKey(ContextPtr) const override;
std::optional<String> sortingKey(ContextPtr) const override;
private:
Iceberg::PersistentTableComponents initializePersistentTableComponents(
StorageObjectStorageConfigurationPtr configuration, IcebergMetadataFilesCachePtr cache_ptr, ContextPtr context_);
Iceberg::IcebergDataSnapshotPtr
getIcebergDataSnapshot(Poco::JSON::Object::Ptr metadata_object, Int64 snapshot_id, ContextPtr local_context) const;
Iceberg::IcebergDataSnapshotPtr createIcebergDataSnapshotFromSnapshotJSON(Poco::JSON::Object::Ptr snapshot_object, Int64 snapshot_id, ContextPtr local_context) const;
std::pair<Iceberg::IcebergDataSnapshotPtr, Int32>
getStateImpl(const ContextPtr & local_context, Poco::JSON::Object::Ptr metadata_object) const;
std::pair<Iceberg::IcebergDataSnapshotPtr, Iceberg::TableStateSnapshot>
getState(const ContextPtr & local_context, const String & metadata_path, Int32 metadata_version) const;
Iceberg::IcebergDataSnapshotPtr
getRelevantDataSnapshotFromTableStateSnapshot(Iceberg::TableStateSnapshot table_state_snapshot, ContextPtr local_context) const;
std::pair<Iceberg::IcebergDataSnapshotPtr, Iceberg::TableStateSnapshot> getRelevantState(const ContextPtr & context) const;
std::optional<String> getPartitionKey(ContextPtr local_context, Iceberg::TableStateSnapshot actual_table_state_snapshot) const;
KeyDescription getSortingKey(ContextPtr local_context, Iceberg::TableStateSnapshot actual_table_state_snapshot) const;
LoggerPtr log;
const ObjectStoragePtr object_storage;
mutable std::shared_ptr<SecondaryStorages> secondary_storages; // Sometimes data or manifests can be located on another storage
DB::Iceberg::PersistentTableComponents persistent_components;
const DataLakeStorageSettings & data_lake_settings;
const String write_format;
};
}
#endif