forked from ClickHouse/ClickHouse
-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathIDataLakeMetadata.h
More file actions
230 lines (185 loc) · 8.48 KB
/
IDataLakeMetadata.h
File metadata and controls
230 lines (185 loc) · 8.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
#pragma once
#include <boost/noncopyable.hpp>
#include <Core/NamesAndTypes.h>
#include <Core/Types.h>
#include <Core/Range.h>
#include <Databases/DataLake/ICatalog.h>
#include <Formats/FormatFilterInfo.h>
#include <Formats/FormatParserSharedResources.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/StorageID.h>
#include <Processors/ISimpleTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/AlterCommands.h>
#include <Storages/MutationCommands.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Storages/ObjectStorage/DataLakes/Iceberg/SchemaProcessor.h>
namespace DataLake
{
class ICatalog;
}
namespace DB
{
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
};
namespace Iceberg
{
struct ColumnInfo;
};
class DataFileMetaInfo
{
public:
DataFileMetaInfo() = default;
// Deserialize from json in distributed requests
explicit DataFileMetaInfo(const Poco::JSON::Object::Ptr file_info);
// Serialize to json in distributed requests
Poco::JSON::Object::Ptr toJson() const;
// subset of Iceberg::ColumnInfo now
struct ColumnInfo
{
std::optional<Int64> rows_count;
std::optional<Int64> nulls_count;
std::optional<DB::Range> hyperrectangle;
};
// Extract metadata from Iceberg structure
explicit DataFileMetaInfo(
const Iceberg::IcebergSchemaProcessor & schema_processor,
Int32 schema_id,
const std::unordered_map<Int32, Iceberg::ColumnInfo> & columns_info_);
void serialize(WriteBuffer & out) const;
static DataFileMetaInfo deserialize(ReadBuffer & in);
bool empty() const { return columns_info.empty(); }
std::unordered_map<std::string, ColumnInfo> columns_info;
};
using DataFileMetaInfoPtr = std::shared_ptr<DataFileMetaInfo>;
struct DataFileInfo
{
std::string file_path;
std::optional<DataFileMetaInfoPtr> file_meta_info;
explicit DataFileInfo(const std::string & file_path_)
: file_path(file_path_) {}
explicit DataFileInfo(std::string && file_path_)
: file_path(std::move(file_path_)) {}
bool operator==(const DataFileInfo & rhs) const
{
return file_path == rhs.file_path;
}
};
class SinkToStorage;
using SinkToStoragePtr = std::shared_ptr<SinkToStorage>;
class StorageObjectStorageConfiguration;
using StorageObjectStorageConfigurationPtr = std::shared_ptr<StorageObjectStorageConfiguration>;
struct StorageID;
struct IObjectIterator;
class IObjectStorage;
struct ObjectInfo;
using ObjectInfoPtr = std::shared_ptr<ObjectInfo>;
using ObjectIterator = std::shared_ptr<IObjectIterator>;
using ObjectStoragePtr = std::shared_ptr<IObjectStorage>;
class IDataLakeMetadata : boost::noncopyable
{
public:
virtual ~IDataLakeMetadata() = default;
virtual bool operator==(const IDataLakeMetadata & other) const = 0;
/// Return iterator to `data files`.
using FileProgressCallback = std::function<void(FileProgress)>;
virtual ObjectIterator iterate(
const ActionsDAG * /* filter_dag */,
FileProgressCallback /* callback */,
size_t /* list_batch_size */,
StorageMetadataPtr storage_metadata,
ContextPtr context) const
= 0;
/// Table schema from data lake metadata.
virtual NamesAndTypesList getTableSchema(ContextPtr local_context) const = 0;
virtual StorageInMemoryMetadata getStorageSnapshotMetadata(ContextPtr) const { throwNotImplemented("getStorageSnapshotMetadata"); }
/// Read schema is the schema of actual data files,
/// which can differ from table schema from data lake metadata.
/// Return nothing if read schema is the same as table schema.
virtual ReadFromFormatInfo prepareReadingFromFormat(
const Strings & requested_columns,
const StorageSnapshotPtr & storage_snapshot,
const ContextPtr & context,
bool supports_subset_of_columns,
bool supports_tuple_elements);
virtual std::shared_ptr<NamesAndTypesList> getInitialSchemaByPath(ContextPtr, ObjectInfoPtr) const { return {}; }
virtual std::shared_ptr<const ActionsDAG> getSchemaTransformer(ContextPtr, ObjectInfoPtr) const { return {}; }
/// Whether current metadata object is updateable (instead of recreation from scratch)
/// to the latest version of table state in data lake.
virtual bool supportsUpdate() const { return false; }
/// Update metadata to the latest version.
virtual void update(const ContextPtr &) { }
virtual bool supportsWrites() const { return false; }
virtual bool supportsParallelInsert() const { return false; }
virtual void modifyFormatSettings(FormatSettings &, const Context &) const {}
virtual bool supportsTruncate() const { return false; }
virtual void truncate(ContextPtr /*context*/, std::shared_ptr<DataLake::ICatalog> /*catalog*/, const StorageID & /*storage_id*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncation is not supported by {} metadata", getName());
}
static constexpr bool supportsTotalRows() { return false; }
virtual std::optional<size_t> totalRows(ContextPtr) const { return {}; }
static constexpr bool supportsTotalBytes() { return false; }
virtual std::optional<size_t> totalBytes(ContextPtr) const { return {}; }
/// Data which we are going to read is sorted by sorting key specified in StorageMetadataPtr.
/// For example in Iceberg it's a valid query to change sort_order for table, but older files will
/// not be rewritten and will be left unsorted or with previous sort order.
/// In this case we shouldn't use read in order optimization.
virtual bool isDataSortedBySortingKey(StorageMetadataPtr, ContextPtr) const { return false; }
/// Some data lakes specify information for reading files from disks.
/// For example, Iceberg has Parquet schema field ids in its metadata for reading files.
virtual ColumnMapperPtr getColumnMapperForObject(ObjectInfoPtr /**/) const { return nullptr; }
virtual ColumnMapperPtr getColumnMapperForCurrentSchema(StorageMetadataPtr, ContextPtr) const { return nullptr; }
virtual SinkToStoragePtr write(
SharedHeader /*sample_block*/,
const StorageID & /*table_id*/,
ObjectStoragePtr /*object_storage*/,
StorageObjectStorageConfigurationPtr /*configuration*/,
const std::optional<FormatSettings> & /*format_settings*/,
ContextPtr /*context*/,
std::shared_ptr<DataLake::ICatalog> /*catalog*/)
{
throwNotImplemented("write");
}
virtual bool optimize(
const StorageMetadataPtr & /*metadata_snapshot*/, ContextPtr /*context*/, const std::optional<FormatSettings> & /*format_settings*/)
{
return false;
}
virtual bool supportsDelete() const { return false; }
virtual void mutate(
const MutationCommands & /*commands*/,
StorageObjectStorageConfigurationPtr /*configuration*/,
ContextPtr /*context*/,
const StorageID & /*storage_id*/,
StorageMetadataPtr /*metadata_snapshot*/,
std::shared_ptr<DataLake::ICatalog> /*catalog*/,
const std::optional<FormatSettings> & /*format_settings*/)
{
throwNotImplemented("mutations");
}
virtual void checkMutationIsPossible(const MutationCommands & /*commands*/) { throwNotImplemented("mutations"); }
virtual void addDeleteTransformers(ObjectInfoPtr, QueryPipelineBuilder &, const std::optional<FormatSettings> &, FormatParserSharedResourcesPtr, ContextPtr) const { }
virtual void checkAlterIsPossible(const AlterCommands & /*commands*/) { throwNotImplemented("alter"); }
virtual void alter(const AlterCommands & /*params*/, ContextPtr /*context*/) { throwNotImplemented("alter"); }
virtual void drop(ContextPtr) { }
virtual std::optional<String> partitionKey(ContextPtr) const { return {}; }
virtual std::optional<String> sortingKey(ContextPtr) const { return {}; }
protected:
virtual ObjectIterator
createKeysIterator(Strings && data_files_, ObjectStoragePtr object_storage_, IDataLakeMetadata::FileProgressCallback callback_) const;
ObjectIterator createKeysIterator(
Strings && data_files_,
ObjectStoragePtr object_storage_,
IDataLakeMetadata::FileProgressCallback callback_,
UInt64 snapshot_version_) const;
[[noreturn]] void throwNotImplemented(std::string_view method) const
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Method `{}` is not implemented for {}", method, getName());
}
virtual const char * getName() const = 0;
};
using DataLakeMetadataPtr = std::unique_ptr<IDataLakeMetadata>;
}