Skip to content

Commit 7c5fd55

Browse files
authored
Merge pull request #1069 from Altinity/frontport/antalya-25.8/optimize_count_in_datalake
25.8 Antalya ports: Read optimization using Iceberg metadata
2 parents 1dc8e15 + 83012cb commit 7c5fd55

20 files changed

+637
-22
lines changed

src/Core/ProtocolDefines.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54
3535

3636
static constexpr auto DBMS_CLUSTER_INITIAL_PROCESSING_PROTOCOL_VERSION = 1;
3737
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_METADATA = 2;
38-
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 2;
38+
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA = 3;
39+
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 3;
3940

4041
static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3;
4142
static constexpr auto DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD = 4;

src/Core/Range.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,13 @@ bool Range::isInfinite() const
151151
return left.isNegativeInfinity() && right.isPositiveInfinity();
152152
}
153153

154+
/// [x, x]
155+
bool Range::isPoint() const
156+
{
157+
return fullBounded() && left_included && right_included && equals(left, right)
158+
&& !left.isNegativeInfinity() && !left.isPositiveInfinity();
159+
}
160+
154161
bool Range::intersectsRange(const Range & r) const
155162
{
156163
/// r to the left of me.

src/Core/Range.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ struct Range
9494

9595
bool isBlank() const;
9696

97+
bool isPoint() const;
98+
9799
bool intersectsRange(const Range & r) const;
98100

99101
bool containsRange(const Range & r) const;

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7062,6 +7062,9 @@ DECLARE(Bool, allow_experimental_ytsaurus_dictionary_source, false, R"(
70627062
)", EXPERIMENTAL) \
70637063
DECLARE(Bool, distributed_plan_force_shuffle_aggregation, false, R"(
70647064
Use Shuffle aggregation strategy instead of PartialAggregation + Merge in distributed query plan.
7065+
)", EXPERIMENTAL) \
7066+
DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"(
7067+
Allow Iceberg read optimization based on Iceberg metadata.
70657068
)", EXPERIMENTAL) \
70667069
DECLARE(Bool, allow_retries_in_cluster_requests, false, R"(
70677070
Allow retries in cluster request, when one node goes offline

src/Core/SettingsChangesHistory.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,12 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
4141
/// Note: please check if the key already exists to prevent duplicate entries.
4242
addSettingsChanges(settings_changes_history, "25.8.9.2000",
4343
{
44+
{"allow_experimental_iceberg_read_optimization", true, true, "New setting."},
4445
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
4546
{"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."},
46-
{"object_storage_cluster", "", "", "Antalya: New setting"},
47-
{"object_storage_max_nodes", 0, 0, "Antalya: New setting"},
48-
{"allow_retries_in_cluster_requests", false, false, "Antalya: New setting"},
47+
{"object_storage_cluster", "", "", "New setting"},
48+
{"object_storage_max_nodes", 0, 0, "New setting"},
49+
{"allow_retries_in_cluster_requests", false, false, "New setting"},
4950
});
5051
addSettingsChanges(settings_changes_history, "25.8",
5152
{

src/Disks/ObjectStorages/IObjectStorage.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <Interpreters/Context.h>
88
#include <Common/Exception.h>
99
#include <Common/ObjectStorageKeyGenerator.h>
10+
#include <Storages/ObjectStorage/DataLakes/IDataLakeMetadata.h>
1011

1112
#include <Poco/JSON/Object.h>
1213
#include <Poco/JSON/Parser.h>
@@ -101,6 +102,13 @@ WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings
101102
return write_settings;
102103
}
103104

105+
RelativePathWithMetadata::RelativePathWithMetadata(const DataFileInfo & info, std::optional<ObjectMetadata> metadata_)
106+
: metadata(std::move(metadata_))
107+
{
108+
relative_path = info.file_path;
109+
file_meta_info = info.file_meta_info;
110+
}
111+
104112
std::string RelativePathWithMetadata::getPathOrPathToArchiveIfArchive() const
105113
{
106114
if (isArchive())

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ struct ObjectMetadata
107107
ObjectAttributes attributes;
108108
};
109109

110+
111+
struct DataFileInfo;
112+
class DataFileMetaInfo;
113+
using DataFileMetaInfoPtr = std::shared_ptr<DataFileMetaInfo>;
114+
110115
struct DataLakeObjectMetadata;
111116

112117
struct RelativePathWithMetadata
@@ -134,19 +139,24 @@ struct RelativePathWithMetadata
134139
std::optional<ObjectMetadata> metadata;
135140
/// Delta lake related object metadata.
136141
std::optional<DataLakeObjectMetadata> data_lake_metadata;
142+
/// Information about columns
143+
std::optional<DataFileMetaInfoPtr> file_meta_info;
137144
/// Retry request after short pause
138145
CommandInTaskResponse command;
139146

140147
RelativePathWithMetadata() = default;
141148

142-
explicit RelativePathWithMetadata(const String & task_string, std::optional<ObjectMetadata> metadata_ = std::nullopt)
143-
: metadata(std::move(metadata_))
144-
, command(task_string)
149+
explicit RelativePathWithMetadata(String command_or_path, std::optional<ObjectMetadata> metadata_ = std::nullopt)
150+
: relative_path(std::move(command_or_path))
151+
, metadata(std::move(metadata_))
152+
, command(relative_path)
145153
{
146-
if (!command.is_parsed())
147-
relative_path = task_string;
154+
if (command.is_parsed())
155+
relative_path = "";
148156
}
149157

158+
explicit RelativePathWithMetadata(const DataFileInfo & info, std::optional<ObjectMetadata> metadata_ = std::nullopt);
159+
150160
RelativePathWithMetadata(const RelativePathWithMetadata & other) = default;
151161

152162
virtual ~RelativePathWithMetadata() = default;
@@ -158,6 +168,9 @@ struct RelativePathWithMetadata
158168
virtual size_t fileSizeInArchive() const { throw Exception(ErrorCodes::LOGICAL_ERROR, "Not an archive"); }
159169
virtual std::string getPathOrPathToArchiveIfArchive() const;
160170

171+
void setFileMetaInfo(std::optional<DataFileMetaInfoPtr> file_meta_info_ ) { file_meta_info = file_meta_info_; }
172+
std::optional<DataFileMetaInfoPtr> getFileMetaInfo() const { return file_meta_info; }
173+
161174
const CommandInTaskResponse & getCommand() const { return command; }
162175
};
163176

src/Interpreters/ClusterFunctionReadTask.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o
2929
if (object->data_lake_metadata.has_value())
3030
data_lake_metadata = object->data_lake_metadata.value();
3131

32+
file_meta_info = object->file_meta_info;
33+
3234
const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes];
3335
path = send_over_whole_archive ? object->getPathOrPathToArchiveIfArchive() : object->getPath();
3436
}
@@ -45,6 +47,7 @@ ObjectInfoPtr ClusterFunctionReadTaskResponse::getObjectInfo() const
4547

4648
auto object = std::make_shared<ObjectInfo>(path);
4749
object->data_lake_metadata = data_lake_metadata;
50+
object->file_meta_info = file_meta_info;
4851
return object;
4952
}
5053

@@ -61,6 +64,14 @@ void ClusterFunctionReadTaskResponse::serialize(WriteBuffer & out, size_t protoc
6164
else
6265
ActionsDAG().serialize(out, registry);
6366
}
67+
68+
if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA)
69+
{
70+
if (file_meta_info.has_value())
71+
file_meta_info.value()->serialize(out);
72+
else
73+
DataFileMetaInfo().serialize(out);
74+
}
6475
}
6576

6677
void ClusterFunctionReadTaskResponse::deserialize(ReadBuffer & in)
@@ -87,6 +98,14 @@ void ClusterFunctionReadTaskResponse::deserialize(ReadBuffer & in)
8798
data_lake_metadata.transform = std::move(transform);
8899
}
89100
}
101+
102+
if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA)
103+
{
104+
auto info = std::make_shared<DataFileMetaInfo>(DataFileMetaInfo::deserialize(in));
105+
106+
if (!path.empty() && !info->empty())
107+
file_meta_info = info;
108+
}
90109
}
91110

92111
}

src/Interpreters/ClusterFunctionReadTask.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ struct ClusterFunctionReadTaskResponse
2020
String path;
2121
/// Object metadata path, in case of data lake object.
2222
DataLakeObjectMetadata data_lake_metadata;
23+
/// File's columns info
24+
std::optional<DataFileMetaInfoPtr> file_meta_info;
2325

2426
/// Convert received response into ObjectInfo.
2527
ObjectInfoPtr getObjectInfo() const;

src/Processors/Chunk.cpp

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,12 @@ void Chunk::addColumn(ColumnPtr column)
108108

109109
void Chunk::addColumn(size_t position, ColumnPtr column)
110110
{
111-
if (position >= columns.size())
111+
if (position == columns.size())
112+
{
113+
addColumn(column);
114+
return;
115+
}
116+
if (position > columns.size())
112117
throw Exception(ErrorCodes::POSITION_OUT_OF_BOUND,
113118
"Position {} out of bound in Chunk::addColumn(), max position = {}",
114119
position, !columns.empty() ? columns.size() - 1 : 0);

0 commit comments

Comments
 (0)