Skip to content

Commit 56b0d59

Browse files
committed
Read optimization using Iceberg metadata
1 parent 61c1d5c commit 56b0d59

File tree

18 files changed

+625
-11
lines changed

18 files changed

+625
-11
lines changed

src/Core/ProtocolDefines.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54
3535

3636
static constexpr auto DBMS_CLUSTER_INITIAL_PROCESSING_PROTOCOL_VERSION = 1;
3737
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_METADATA = 2;
38-
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 2;
38+
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA = 3;
39+
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 3;
3940

4041
static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3;
4142
static constexpr auto DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD = 4;

src/Core/Range.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22
#include <Core/Range.h>
33
#include <IO/Operators.h>
44
#include <IO/WriteBufferFromString.h>
5+
#include <IO/ReadBufferFromString.h>
56
#include <Common/FieldVisitorToString.h>
67
#include <Common/FieldAccurateComparison.h>
78

89

910
namespace DB
1011
{
1112

13+
1214
FieldRef::FieldRef(ColumnsWithTypeAndName * columns_, size_t row_idx_, size_t column_idx_)
1315
: Field((*(*columns_)[column_idx_].column)[row_idx_]), columns(columns_), row_idx(row_idx_), column_idx(column_idx_)
1416
{
@@ -151,6 +153,13 @@ bool Range::isInfinite() const
151153
return left.isNegativeInfinity() && right.isPositiveInfinity();
152154
}
153155

156+
/// [x, x]
157+
bool Range::isPoint() const
158+
{
159+
return fullBounded() && left_included && right_included && equals(left, right)
160+
&& !left.isNegativeInfinity() && !left.isPositiveInfinity();
161+
}
162+
154163
bool Range::intersectsRange(const Range & r) const
155164
{
156165
/// r to the left of me.

src/Core/Range.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ struct Range
9494

9595
bool isBlank() const;
9696

97+
bool isPoint() const;
98+
9799
bool intersectsRange(const Range & r) const;
98100

99101
bool containsRange(const Range & r) const;

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7024,6 +7024,9 @@ DECLARE(Bool, allow_experimental_ytsaurus_dictionary_source, false, R"(
70247024
)", EXPERIMENTAL) \
70257025
DECLARE(Bool, distributed_plan_force_shuffle_aggregation, false, R"(
70267026
Use Shuffle aggregation strategy instead of PartialAggregation + Merge in distributed query plan.
7027+
)", EXPERIMENTAL) \
7028+
DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"(
7029+
Allow Iceberg read optimization based on Iceberg metadata.
70277030
)", EXPERIMENTAL) \
70287031
\
70297032
/** Experimental timeSeries* aggregate functions. */ \

src/Core/SettingsChangesHistory.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,18 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
132132
{"distributed_plan_force_shuffle_aggregation", 0, 0, "New experimental setting"},
133133
{"allow_experimental_insert_into_iceberg", false, false, "New setting."},
134134
/// RELEASE CLOSED
135+
{"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"},
136+
{"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"},
137+
{"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"},
138+
{"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"},
139+
{"lock_object_storage_task_distribution_ms", 0, 0, "New setting."},
140+
{"object_storage_cluster", "", "", "New setting"},
141+
{"object_storage_max_nodes", 0, 0, "New setting"},
142+
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
143+
{"object_storage_remote_initiator", false, false, "New setting."},
144+
{"allow_experimental_export_merge_tree_part", false, false, "New setting."},
145+
{"allow_experimental_iceberg_read_optimization", true, true, "New setting."},
146+
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
135147
});
136148
addSettingsChanges(settings_changes_history, "25.6",
137149
{

src/Disks/ObjectStorages/IObjectStorage.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
#include <Interpreters/Context.h>
88
#include <Common/Exception.h>
99
#include <Common/ObjectStorageKeyGenerator.h>
10-
10+
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
1111

1212
namespace DB
1313
{
@@ -97,6 +97,13 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings
9797
return write_settings;
9898
}
9999

100+
RelativePathWithMetadata::RelativePathWithMetadata(const DataFileInfo & info, std::optional<ObjectMetadata> metadata_)
101+
: metadata(std::move(metadata_))
102+
{
103+
relative_path = info.file_path;
104+
file_meta_info = info.file_meta_info;
105+
}
106+
100107
std::string RelativePathWithMetadata::getPathOrPathToArchiveIfArchive() const
101108
{
102109
if (isArchive())

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
#include <Poco/Timestamp.h>
1111
#include <Poco/Util/AbstractConfiguration.h>
12+
#include <Poco/JSON/Object.h>
1213
#include <Core/Defines.h>
1314
#include <IO/ReadSettings.h>
1415
#include <IO/WriteSettings.h>
@@ -107,6 +108,11 @@ struct ObjectMetadata
107108
ObjectAttributes attributes;
108109
};
109110

111+
112+
struct DataFileInfo;
113+
class DataFileMetaInfo;
114+
using DataFileMetaInfoPtr = std::shared_ptr<DataFileMetaInfo>;
115+
110116
struct DataLakeObjectMetadata;
111117

112118
struct RelativePathWithMetadata
@@ -116,13 +122,16 @@ struct RelativePathWithMetadata
116122
std::optional<ObjectMetadata> metadata;
117123
/// Delta lake related object metadata.
118124
std::optional<DataLakeObjectMetadata> data_lake_metadata;
125+
/// Information about columns
126+
std::optional<DataFileMetaInfoPtr> file_meta_info;
119127

120128
RelativePathWithMetadata() = default;
121129

122130
explicit RelativePathWithMetadata(String relative_path_, std::optional<ObjectMetadata> metadata_ = std::nullopt)
123131
: relative_path(std::move(relative_path_))
124132
, metadata(std::move(metadata_))
125133
{}
134+
explicit RelativePathWithMetadata(const DataFileInfo & info, std::optional<ObjectMetadata> metadata_ = std::nullopt);
126135

127136
RelativePathWithMetadata(const RelativePathWithMetadata & other) = default;
128137

@@ -134,6 +143,9 @@ struct RelativePathWithMetadata
134143
virtual std::string getPathToArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
135144
virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
136145
virtual std::string getPathOrPathToArchiveIfArchive() const;
146+
147+
void setFileMetaInfo(std::optional<DataFileMetaInfoPtr> file_meta_info_ ) { file_meta_info = file_meta_info_; }
148+
std::optional<DataFileMetaInfoPtr> getFileMetaInfo() const { return file_meta_info; }
137149
};
138150

139151
struct ObjectKeyWithMetadata

src/Interpreters/ClusterFunctionReadTask.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o
2929
if (object->data_lake_metadata.has_value())
3030
data_lake_metadata = object->data_lake_metadata.value();
3131

32+
file_meta_info = object->file_meta_info;
33+
3234
const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes];
3335
path = send_over_whole_archive ? object->getPathOrPathToArchiveIfArchive() : object->getPath();
3436
}
@@ -45,6 +47,7 @@ ObjectInfoPtr ClusterFunctionReadTaskResponse::getObjectInfo() const
4547

4648
auto object = std::make_shared<ObjectInfo>(path);
4749
object->data_lake_metadata = data_lake_metadata;
50+
object->file_meta_info = file_meta_info;
4851
return object;
4952
}
5053

@@ -61,6 +64,14 @@ void ClusterFunctionReadTaskResponse::serialize(WriteBuffer & out, size_t protoc
6164
else
6265
ActionsDAG().serialize(out, registry);
6366
}
67+
68+
if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA)
69+
{
70+
if (file_meta_info.has_value())
71+
file_meta_info.value()->serialize(out);
72+
else
73+
DataFileMetaInfo().serialize(out);
74+
}
6475
}
6576

6677
void ClusterFunctionReadTaskResponse::deserialize(ReadBuffer & in)
@@ -87,6 +98,14 @@ void ClusterFunctionReadTaskResponse::deserialize(ReadBuffer & in)
8798
data_lake_metadata.transform = std::move(transform);
8899
}
89100
}
101+
102+
if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA)
103+
{
104+
auto info = std::make_shared<DataFileMetaInfo>(DataFileMetaInfo::deserialize(in));
105+
106+
if (!path.empty() && !info->empty())
107+
file_meta_info = info;
108+
}
90109
}
91110

92111
}

src/Interpreters/ClusterFunctionReadTask.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ struct ClusterFunctionReadTaskResponse
2020
String path;
2121
/// Object metadata path, in case of data lake object.
2222
DataLakeObjectMetadata data_lake_metadata;
23+
/// File's columns info
24+
std::optional<DataFileMetaInfoPtr> file_meta_info;
2325

2426
/// Convert received response into ObjectInfo.
2527
ObjectInfoPtr getObjectInfo() const;

src/Processors/Chunk.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,12 @@ void Chunk::addColumn(ColumnPtr column)
108108

109109
void Chunk::addColumn(size_t position, ColumnPtr column)
110110
{
111-
if (position >= columns.size())
111+
if (position == columns.size())
112+
{
113+
addColumn(column);
114+
return;
115+
}
116+
if (position > columns.size())
112117
throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND,
113118
"Position {} out of bound in Chunk::addColumn(), max position = {}",
114119
position, !columns.empty() ? columns.size() - 1 : 0);

0 commit comments

Comments
 (0)