Skip to content

Commit 770d011

Browse files
authored
Merge pull request #1075 from Altinity/frontport/antalya-25.8/iceberg_features
Antalya 25.8 port of #756, #770, #822, #898
2 parents 70fba7c + 708d59b commit 770d011

File tree

18 files changed

+201
-42
lines changed

18 files changed

+201
-42
lines changed

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7071,6 +7071,9 @@ Allow Iceberg read optimization based on Iceberg metadata.
70717071
)", EXPERIMENTAL) \
70727072
DECLARE(Bool, allow_retries_in_cluster_requests, false, R"(
70737073
Allow retries in cluster request, when one node goes offline
7074+
)", EXPERIMENTAL) \
7075+
DECLARE(Bool, object_storage_remote_initiator, false, R"(
7076+
Execute request to object storage as remote on one of object_storage_cluster nodes.
70747077
)", EXPERIMENTAL) \
70757078
\
70767079
/** Experimental timeSeries* aggregate functions. */ \

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
4747
{"object_storage_cluster", "", "", "New setting"},
4848
{"object_storage_max_nodes", 0, 0, "New setting"},
4949
{"allow_retries_in_cluster_requests", false, false, "New setting"},
50+
{"object_storage_remote_initiator", false, false, "New setting."},
5051
});
5152
addSettingsChanges(settings_changes_history, "25.8",
5253
{

src/Databases/DataLake/DataLakeConstants.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ namespace DataLake
88
{
99

1010
static constexpr auto DATABASE_ENGINE_NAME = "DataLakeCatalog";
11+
static constexpr auto DATABASE_ALIAS_NAME = "Iceberg";
1112
static constexpr std::string_view FILE_PATH_PREFIX = "file:/";
1213

1314
/// Some catalogs (Unity or Glue) may store not only Iceberg/DeltaLake tables but other kinds of "tables"

src/Databases/DataLake/DatabaseDataLake.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,6 +732,11 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
732732
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `{}` must have arguments", database_engine_name);
733733
}
734734

735+
if (database_engine_name == "Iceberg" && catalog_type != DatabaseDataLakeCatalogType::ICEBERG_REST)
736+
{
737+
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Engine `Iceberg` must have `rest` catalog type only");
738+
}
739+
735740
for (auto & engine_arg : engine_args)
736741
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.context);
737742

@@ -813,6 +818,7 @@ void registerDatabaseDataLake(DatabaseFactory & factory)
813818
args.uuid);
814819
};
815820
factory.registerDatabase("DataLakeCatalog", create_fn, { .supports_arguments = true, .supports_settings = true });
821+
factory.registerDatabase("Iceberg", create_fn, { .supports_arguments = true, .supports_settings = true });
816822
}
817823

818824
}

src/IO/ReadBufferFromS3.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,12 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
445445
log, "Read S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}",
446446
bucket, key, version_id.empty() ? "Latest" : version_id, range_begin);
447447
}
448+
else
449+
{
450+
LOG_TEST(
451+
log, "Read S3 object. Bucket: {}, Key: {}, Version: {}",
452+
bucket, key, version_id.empty() ? "Latest" : version_id);
453+
}
448454

449455
ProfileEvents::increment(ProfileEvents::S3GetObject);
450456
if (client_ptr->isClientForDisk())

src/Parsers/ASTSetQuery.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ void ASTSetQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & format,
129129
return true;
130130
}
131131

132-
if (DataLake::DATABASE_ENGINE_NAME == state.create_engine_name)
132+
if (DataLake::DATABASE_ENGINE_NAME == state.create_engine_name
133+
|| DataLake::DATABASE_ALIAS_NAME == state.create_engine_name)
133134
{
134135
if (DataLake::SETTINGS_TO_HIDE.contains(change.name))
135136
{

src/Parsers/FunctionSecretArgumentsFinder.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -736,7 +736,7 @@ class FunctionSecretArgumentsFinder
736736
/// S3('url', 'access_key_id', 'secret_access_key')
737737
findS3DatabaseSecretArguments();
738738
}
739-
else if (engine_name == "DataLakeCatalog")
739+
else if (engine_name == "DataLakeCatalog" || engine_name == "Iceberg")
740740
{
741741
findDataLakeCatalogSecretArguments();
742742
}

src/Storages/IStorageCluster.cpp

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
#include <Storages/IStorageCluster.h>
22

3+
#include <pcg_random.hpp>
4+
#include <Common/randomSeed.h>
5+
36
#include <Common/Exception.h>
47
#include <Core/Settings.h>
58
#include <Core/QueryProcessingStage.h>
@@ -13,6 +16,7 @@
1316
#include <Interpreters/AddDefaultDatabaseVisitor.h>
1417
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
1518
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
19+
#include <Planner/Utils.h>
1620
#include <Processors/Sources/NullSource.h>
1721
#include <Processors/Sources/RemoteSource.h>
1822
#include <QueryPipeline/narrowPipe.h>
@@ -28,6 +32,8 @@
2832
#include <Analyzer/QueryNode.h>
2933
#include <Analyzer/ColumnNode.h>
3034
#include <Analyzer/InDepthQueryTreeVisitor.h>
35+
#include <Storages/StorageDistributed.h>
36+
#include <TableFunctions/TableFunctionFactory.h>
3137

3238
#include <algorithm>
3339
#include <memory>
@@ -47,6 +53,7 @@ namespace Setting
4753
extern const SettingsNonZeroUInt64 max_parallel_replicas;
4854
extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode;
4955
extern const SettingsUInt64 object_storage_max_nodes;
56+
extern const SettingsBool object_storage_remote_initiator;
5057
}
5158

5259
namespace ErrorCodes
@@ -283,8 +290,9 @@ void IStorageCluster::read(
283290

284291
storage_snapshot->check(column_names);
285292

286-
updateBeforeRead(context);
287-
auto cluster = getClusterImpl(context, cluster_name_from_settings, context->getSettingsRef()[Setting::object_storage_max_nodes]);
293+
const auto & settings = context->getSettingsRef();
294+
295+
auto cluster = getClusterImpl(context, cluster_name_from_settings, settings[Setting::object_storage_max_nodes]);
288296

289297
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
290298

@@ -293,7 +301,7 @@ void IStorageCluster::read(
293301

294302
updateQueryWithJoinToSendIfNeeded(query_to_send, query_info.query_tree, context);
295303

296-
if (context->getSettingsRef()[Setting::allow_experimental_analyzer])
304+
if (settings[Setting::allow_experimental_analyzer])
297305
{
298306
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_to_send, context, SelectQueryOptions(processed_stage));
299307
}
@@ -306,6 +314,17 @@ void IStorageCluster::read(
306314

307315
updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context);
308316

317+
if (settings[Setting::object_storage_remote_initiator])
318+
{
319+
auto storage_and_context = convertToRemote(cluster, context, cluster_name_from_settings, query_to_send);
320+
auto src_distributed = std::dynamic_pointer_cast<StorageDistributed>(storage_and_context.storage);
321+
auto modified_query_info = query_info;
322+
modified_query_info.cluster = src_distributed->getCluster();
323+
auto new_storage_snapshot = storage_and_context.storage->getStorageSnapshot(storage_snapshot->metadata, storage_and_context.context);
324+
storage_and_context.storage->read(query_plan, column_names, new_storage_snapshot, modified_query_info, storage_and_context.context, processed_stage, max_block_size, num_streams);
325+
return;
326+
}
327+
309328
RestoreQualifiedNamesVisitor::Data data;
310329
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_to_send->as<ASTSelectQuery &>(), 0));
311330
data.remote_table.database = context->getCurrentDatabase();
@@ -333,6 +352,62 @@ void IStorageCluster::read(
333352
query_plan.addStep(std::move(reading));
334353
}
335354

355+
IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote(
356+
ClusterPtr cluster,
357+
ContextPtr context,
358+
const std::string & cluster_name_from_settings,
359+
ASTPtr query_to_send)
360+
{
361+
auto host_addresses = cluster->getShardsAddresses();
362+
if (host_addresses.empty())
363+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings);
364+
365+
static pcg64 rng(randomSeed());
366+
size_t shard_num = rng() % host_addresses.size();
367+
auto shard_addresses = host_addresses[shard_num];
368+
/// After getClusterImpl each shard must have exactly 1 replica
369+
if (shard_addresses.size() != 1)
370+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of shard {} in cluster {} is not equal 1", shard_num, cluster_name_from_settings);
371+
auto host_name = shard_addresses[0].toString();
372+
373+
LOG_INFO(log, "Choose remote initiator '{}'", host_name);
374+
375+
bool secure = shard_addresses[0].secure == Protocol::Secure::Enable;
376+
std::string remote_function_name = secure ? "remoteSecure" : "remote";
377+
378+
/// Clean object_storage_remote_initiator setting to avoid infinite remote call
379+
auto new_context = Context::createCopy(context);
380+
new_context->setSetting("object_storage_remote_initiator", false);
381+
382+
auto * select_query = query_to_send->as<ASTSelectQuery>();
383+
if (!select_query)
384+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query");
385+
386+
auto query_settings = select_query->settings();
387+
if (query_settings)
388+
{
389+
auto & settings_ast = query_settings->as<ASTSetQuery &>();
390+
if (settings_ast.changes.removeSetting("object_storage_remote_initiator") && settings_ast.changes.empty())
391+
{
392+
select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, {});
393+
}
394+
}
395+
396+
ASTTableExpression * table_expression = extractTableExpressionASTPtrFromSelectQuery(query_to_send);
397+
if (!table_expression)
398+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression");
399+
400+
auto remote_query = makeASTFunction(remote_function_name, std::make_shared<ASTLiteral>(host_name), table_expression->table_function);
401+
402+
table_expression->table_function = remote_query;
403+
404+
auto remote_function = TableFunctionFactory::instance().get(remote_query, new_context);
405+
406+
auto storage = remote_function->execute(query_to_send, new_context, remote_function_name);
407+
408+
return RemoteCallVariables{storage, new_context};
409+
}
410+
336411
SinkToStoragePtr IStorageCluster::write(
337412
const ASTPtr & query,
338413
const StorageMetadataPtr & metadata_snapshot,

src/Storages/IStorageCluster.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,21 @@ class IStorageCluster : public IStorage
6060
virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); }
6161

6262
protected:
63-
virtual void updateBeforeRead(const ContextPtr &) {}
6463
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}
6564
void updateQueryWithJoinToSendIfNeeded(ASTPtr & query_to_send, QueryTreeNodePtr query_tree, const ContextPtr & context);
6665

66+
struct RemoteCallVariables
67+
{
68+
StoragePtr storage;
69+
ContextPtr context;
70+
};
71+
72+
RemoteCallVariables convertToRemote(
73+
ClusterPtr cluster,
74+
ContextPtr context,
75+
const std::string & cluster_name_from_settings,
76+
ASTPtr query_to_send);
77+
6778
virtual void readFallBackToPure(
6879
QueryPlan & /* query_plan */,
6980
const Names & /* column_names */,

src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <Parsers/ASTLiteral.h>
2020
#include <Parsers/ASTFunction.h>
2121
#include <Parsers/ASTIdentifier.h>
22+
#include <Parsers/ASTSetQuery.h>
2223
#include <Disks/DiskType.h>
2324

2425
#include <memory>
@@ -42,20 +43,21 @@ namespace ErrorCodes
4243

4344
namespace DataLakeStorageSetting
4445
{
45-
extern DataLakeStorageSettingsBool allow_dynamic_metadata_for_data_lakes;
46-
extern DataLakeStorageSettingsDatabaseDataLakeCatalogType storage_catalog_type;
47-
extern DataLakeStorageSettingsString object_storage_endpoint;
48-
extern DataLakeStorageSettingsString storage_aws_access_key_id;
49-
extern DataLakeStorageSettingsString storage_aws_secret_access_key;
50-
extern DataLakeStorageSettingsString storage_region;
51-
extern DataLakeStorageSettingsString storage_catalog_url;
52-
extern DataLakeStorageSettingsString storage_warehouse;
53-
extern DataLakeStorageSettingsString storage_catalog_credential;
54-
55-
extern DataLakeStorageSettingsString storage_auth_scope;
56-
extern DataLakeStorageSettingsString storage_auth_header;
57-
extern DataLakeStorageSettingsString storage_oauth_server_uri;
58-
extern DataLakeStorageSettingsBool storage_oauth_server_use_request_body;
46+
extern const DataLakeStorageSettingsBool allow_dynamic_metadata_for_data_lakes;
47+
extern const DataLakeStorageSettingsDatabaseDataLakeCatalogType storage_catalog_type;
48+
extern const DataLakeStorageSettingsString object_storage_endpoint;
49+
extern const DataLakeStorageSettingsString storage_aws_access_key_id;
50+
extern const DataLakeStorageSettingsString storage_aws_secret_access_key;
51+
extern const DataLakeStorageSettingsString storage_region;
52+
extern const DataLakeStorageSettingsString storage_catalog_url;
53+
extern const DataLakeStorageSettingsString storage_warehouse;
54+
extern const DataLakeStorageSettingsString storage_catalog_credential;
55+
56+
extern const DataLakeStorageSettingsString storage_auth_scope;
57+
extern const DataLakeStorageSettingsString storage_auth_header;
58+
extern const DataLakeStorageSettingsString storage_oauth_server_uri;
59+
extern const DataLakeStorageSettingsBool storage_oauth_server_use_request_body;
60+
extern const DataLakeStorageSettingsString iceberg_metadata_file_path;
5961
}
6062

6163
template <typename T>
@@ -324,6 +326,42 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
324326
current_metadata->addDeleteTransformers(object_info, builder, format_settings, local_context);
325327
}
326328

329+
ASTPtr createArgsWithAccessData() const override
330+
{
331+
auto res = BaseStorageConfiguration::createArgsWithAccessData();
332+
333+
auto iceberg_metadata_file_path = (*settings)[DataLakeStorageSetting::iceberg_metadata_file_path];
334+
335+
if (iceberg_metadata_file_path.changed)
336+
{
337+
auto * arguments = res->template as<ASTExpressionList>();
338+
if (!arguments)
339+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Arguments are not an expression list");
340+
341+
bool has_settings = false;
342+
343+
for (auto & arg : arguments->children)
344+
{
345+
if (auto * settings_ast = arg->template as<ASTSetQuery>())
346+
{
347+
has_settings = true;
348+
settings_ast->changes.setSetting("iceberg_metadata_file_path", iceberg_metadata_file_path.value);
349+
break;
350+
}
351+
}
352+
353+
if (!has_settings)
354+
{
355+
std::shared_ptr<ASTSetQuery> settings_ast = std::make_shared<ASTSetQuery>();
356+
settings_ast->is_standalone = false;
357+
settings_ast->changes.setSetting("iceberg_metadata_file_path", iceberg_metadata_file_path.value);
358+
arguments->children.push_back(settings_ast);
359+
}
360+
}
361+
362+
return res;
363+
}
364+
327365
private:
328366
DataLakeMetadataPtr current_metadata;
329367
LoggerPtr log = getLogger("DataLakeConfiguration");

0 commit comments

Comments
 (0)