Skip to content

Commit 5af7474

Browse files
committed
Read optimization based on Iceberg metadata
1 parent 89d7ee8 commit 5af7474

21 files changed

+465
-64
lines changed

src/Core/Range.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@
99
namespace DB
1010
{
1111

12+
namespace ErrorCodes
13+
{
14+
extern const int INCORRECT_DATA;
15+
};
16+
17+
1218
FieldRef::FieldRef(ColumnsWithTypeAndName * columns_, size_t row_idx_, size_t column_idx_)
1319
: Field((*(*columns_)[column_idx_].column)[row_idx_]), columns(columns_), row_idx(row_idx_), column_idx(column_idx_)
1420
{
@@ -151,6 +157,13 @@ bool Range::isInfinite() const
151157
return left.isNegativeInfinity() && right.isPositiveInfinity();
152158
}
153159

160+
/// [x, x]
161+
bool Range::isPoint() const
162+
{
163+
return fullBounded() && left_included && right_included && equals(left, right)
164+
&& !left.isNegativeInfinity() && !left.isPositiveInfinity();
165+
}
166+
154167
bool Range::intersectsRange(const Range & r) const
155168
{
156169
/// r to the left of me.
@@ -332,6 +345,48 @@ String Range::toString() const
332345
return str.str();
333346
}
334347

348+
String Range::dump() const
349+
{
350+
WriteBufferFromOwnString str;
351+
352+
str << (left_included ? '[' : '(') << left.dump() << ",";
353+
str << right.dump() << (right_included ? ']' : ')');
354+
355+
return str.str();
356+
}
357+
358+
void Range::restoreFromDump(const String & range)
359+
{
360+
if (range.empty())
361+
throw Exception(ErrorCodes::INCORRECT_DATA, "Empty range dump");
362+
363+
if (range[0] == '[')
364+
left_included = true;
365+
else if (range[0] == '(')
366+
left_included = false;
367+
else
368+
throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect range: {}", range);
369+
370+
if (range[range.size() - 1] == ']')
371+
right_included = true;
372+
else if (range[range.size() - 1] == ')')
373+
right_included = false;
374+
else
375+
throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect range: {}", range);
376+
377+
/// TODO: Strings with comma
378+
auto separator = range.find(',');
379+
if (separator == std::string::npos || separator == range.size())
380+
throw Exception(ErrorCodes::INCORRECT_DATA, "Incorrect range: {}", range);
381+
382+
std::string_view l(range.data() + 1, separator - 1);
383+
std::string_view r(range.data() + separator + 1, range.size() - separator - 2);
384+
385+
/// TODO: "Decimal64_'1596962100.000000'" can't be parsed by some reason
386+
left = Field::restoreFromDump(std::string_view(range.data() + 1, separator - 1));
387+
right = Field::restoreFromDump(std::string_view(range.data() + separator + 1, range.size() - separator - 2));
388+
}
389+
335390
Hyperrectangle intersect(const Hyperrectangle & a, const Hyperrectangle & b)
336391
{
337392
size_t result_size = std::min(a.size(), b.size());

src/Core/Range.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ struct Range
9494

9595
bool isBlank() const;
9696

97+
bool isPoint() const;
98+
9799
bool intersectsRange(const Range & r) const;
98100

99101
bool containsRange(const Range & r) const;
@@ -114,6 +116,9 @@ struct Range
114116
bool nearByWith(const Range & r) const;
115117

116118
String toString() const;
119+
120+
String dump() const;
121+
void restoreFromDump(const String & range);
117122
};
118123

119124
Range intersect(const Range & a, const Range & b);

src/Disks/ObjectStorages/IObjectStorage.cpp

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
#include <Common/Exception.h>
99
#include <Common/ObjectStorageKeyGenerator.h>
1010

11+
/// TODO: move DataFileInfo into separate file
12+
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
13+
1114
#include <Poco/JSON/Object.h>
1215
#include <Poco/JSON/Parser.h>
1316
#include <Poco/JSON/JSONException.h>
@@ -101,6 +104,28 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings
101104
return write_settings;
102105
}
103106

107+
RelativePathWithMetadata::RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_)
108+
: metadata(std::move(metadata_))
109+
, command(task_string)
110+
{
111+
if (!command.isParsed())
112+
relative_path = task_string;
113+
else
114+
{
115+
auto file_path = command.getFilePath();
116+
if (file_path.has_value())
117+
relative_path = file_path.value();
118+
file_meta_info = command.getFileMetaInfo();
119+
}
120+
}
121+
122+
RelativePathWithMetadata::RelativePathWithMetadata(const DataFileInfo & info, std::optional<ObjectMetadata> metadata_)
123+
: metadata(std::move(metadata_))
124+
{
125+
relative_path = info.file_path;
126+
file_meta_info = info.file_meta_info;
127+
}
128+
104129
void RelativePathWithMetadata::loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file)
105130
{
106131
if (!metadata)
@@ -129,20 +154,29 @@ RelativePathWithMetadata::CommandInTaskResponse::CommandInTaskResponse(const std
129154

130155
successfully_parsed = true;
131156

157+
if (json->has("file_path"))
158+
file_path = json->getValue<std::string>("file_path");
132159
if (json->has("retry_after_us"))
133160
retry_after_us = json->getValue<size_t>("retry_after_us");
161+
if (json->has("meta_info"))
162+
file_meta_info = std::make_shared<DataFileMetaInfo>(json->getObject("meta_info"));
134163
}
135164
catch (const Poco::JSON::JSONException &)
136165
{ /// Not a JSON
137166
return;
138167
}
139168
}
140169

141-
std::string RelativePathWithMetadata::CommandInTaskResponse::to_string() const
170+
std::string RelativePathWithMetadata::CommandInTaskResponse::toString() const
142171
{
143172
Poco::JSON::Object json;
173+
174+
if (file_path.has_value())
175+
json.set("file_path", file_path.value());
144176
if (retry_after_us.has_value())
145177
json.set("retry_after_us", retry_after_us.value());
178+
if (file_meta_info.has_value())
179+
json.set("meta_info", file_meta_info.value()->toJson());
146180

147181
std::ostringstream oss;
148182
oss.exceptions(std::ios::failbit);

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
#include <Poco/Timestamp.h>
1111
#include <Poco/Util/AbstractConfiguration.h>
12+
#include <Poco/JSON/Object.h>
1213
#include <Core/Defines.h>
1314
#include <IO/ReadSettings.h>
1415
#include <IO/WriteSettings.h>
@@ -101,6 +102,10 @@ struct ObjectMetadata
101102
ObjectAttributes attributes;
102103
};
103104

105+
struct DataFileInfo;
106+
class DataFileMetaInfo;
107+
using DataFileMetaInfoPtr = std::shared_ptr<DataFileMetaInfo>;
108+
104109
struct RelativePathWithMetadata
105110
{
106111
class CommandInTaskResponse
@@ -109,31 +114,35 @@ struct RelativePathWithMetadata
109114
CommandInTaskResponse() = default;
110115
explicit CommandInTaskResponse(const std::string & task);
111116

112-
bool is_parsed() const { return successfully_parsed; }
113-
void set_retry_after_us(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }
117+
bool isParsed() const { return successfully_parsed; }
118+
void setFilePath(const std::string & file_path_ ) { file_path = file_path_; }
119+
void setRetryAfterUs(Poco::Timestamp::TimeDiff time_us) { retry_after_us = time_us; }
120+
void setFileMetaInfo(DataFileMetaInfoPtr file_meta_info_ ) { file_meta_info = file_meta_info_; }
121+
122+
std::string toString() const;
123+
124+
std::optional<std::string> getFilePath() const { return file_path; }
114125

115-
std::string to_string() const;
126+
std::optional<Poco::Timestamp::TimeDiff> getRetryAfterUs() const { return retry_after_us; }
116127

117-
std::optional<Poco::Timestamp::TimeDiff> get_retry_after_us() const { return retry_after_us; }
128+
std::optional<DataFileMetaInfoPtr> getFileMetaInfo() const { return file_meta_info; }
118129

119130
private:
120131
bool successfully_parsed = false;
132+
std::optional<std::string> file_path;
121133
std::optional<Poco::Timestamp::TimeDiff> retry_after_us;
134+
std::optional<DataFileMetaInfoPtr> file_meta_info;
122135
};
123136

124137
String relative_path;
125138
std::optional<ObjectMetadata> metadata;
126139
CommandInTaskResponse command;
140+
std::optional<DataFileMetaInfoPtr> file_meta_info;
127141

128142
RelativePathWithMetadata() = default;
129143

130-
explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt)
131-
: metadata(std::move(metadata_))
132-
, command(task_string)
133-
{
134-
if (!command.is_parsed())
135-
relative_path = task_string;
136-
}
144+
explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt);
145+
explicit RelativePathWithMetadata(const DataFileInfo & info, std::optional<ObjectMetadata> metadata_ = std::nullopt);
137146

138147
virtual ~RelativePathWithMetadata() = default;
139148

@@ -143,6 +152,10 @@ struct RelativePathWithMetadata
143152
virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
144153
virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
145154

155+
void setFileMetaInfo(DataFileMetaInfoPtr file_meta_info_ ) { file_meta_info = file_meta_info_; }
156+
void setFileMetaInfo(std::optional<DataFileMetaInfoPtr> file_meta_info_ ) { file_meta_info = file_meta_info_; }
157+
std::optional<DataFileMetaInfoPtr> getFileMetaInfo() const { return file_meta_info; }
158+
146159
void loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file);
147160
const CommandInTaskResponse & getCommand() const { return command; }
148161
};

src/Processors/Chunk.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,12 @@ void Chunk::addColumn(ColumnPtr column)
108108

109109
void Chunk::addColumn(size_t position, ColumnPtr column)
110110
{
111-
if (position >= columns.size())
111+
if (position == columns.size())
112+
{
113+
addColumn(column);
114+
return;
115+
}
116+
if (position > columns.size())
112117
throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND,
113118
"Position {} out of bound in Chunk::addColumn(), max position = {}",
114119
position, !columns.empty() ? columns.size() - 1 : 0);

src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
104104
return std::nullopt;
105105
auto data_files = current_metadata->getDataFiles();
106106
if (!data_files.empty())
107-
return data_files[0];
107+
return data_files[0].file_path;
108108
return std::nullopt;
109109
}
110110

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ struct DeltaLakeMetadataImpl
158158
struct DeltaLakeMetadata
159159
{
160160
NamesAndTypesList schema;
161-
Strings data_files;
161+
DataFileInfos data_files;
162162
DeltaLakePartitionColumns partition_columns;
163163
};
164164

@@ -195,7 +195,7 @@ struct DeltaLakeMetadataImpl
195195
processMetadataFile(key, current_schema, current_partition_columns, result_files);
196196
}
197197

198-
return DeltaLakeMetadata{current_schema, Strings(result_files.begin(), result_files.end()), current_partition_columns};
198+
return DeltaLakeMetadata{current_schema, DataFileInfos(result_files.begin(), result_files.end()), current_partition_columns};
199199
}
200200

201201
/**

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class DeltaLakeMetadata final : public IDataLakeMetadata
3535

3636
DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);
3737

38-
Strings getDataFiles() const override { return data_files; }
38+
DataFileInfos getDataFiles() const override { return data_files; }
3939

4040
NamesAndTypesList getTableSchema() const override { return schema; }
4141

@@ -67,12 +67,12 @@ class DeltaLakeMetadata final : public IDataLakeMetadata
6767
ContextPtr context) const override;
6868

6969
private:
70-
mutable Strings data_files;
70+
mutable DataFileInfos data_files;
7171
NamesAndTypesList schema;
7272
DeltaLakePartitionColumns partition_columns;
7373
ObjectStoragePtr object_storage;
7474

75-
Strings getDataFiles(const ActionsDAG *) const { return data_files; }
75+
DataFileInfos getDataFiles(const ActionsDAG *) const { return data_files; }
7676
};
7777

7878
}

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ bool DeltaLakeMetadataDeltaKernel::update(const ContextPtr &)
3535
return table_snapshot->update();
3636
}
3737

38-
Strings DeltaLakeMetadataDeltaKernel::getDataFiles() const
38+
DataFileInfos DeltaLakeMetadataDeltaKernel::getDataFiles() const
3939
{
4040
throwNotImplemented("getDataFiles()");
4141
}

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata
3333

3434
bool update(const ContextPtr & context) override;
3535

36-
Strings getDataFiles() const override;
36+
DataFileInfos getDataFiles() const override;
3737

3838
NamesAndTypesList getTableSchema() const override;
3939

0 commit comments

Comments
 (0)