Skip to content

Commit cfa1468

Browse files
authored
Merge pull request #1050 from Altinity/frontport/antalya-25.8/alternative_syntax
25.8 Antalya port - Alternative syntax for cluster functions
2 parents f15b21f + acf8dc1 commit cfa1468

File tree

71 files changed

+3068
-568
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+3068
-568
lines changed

src/Analyzer/FunctionSecretArgumentsFinderTreeNode.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,14 @@ class FunctionTreeNodeImpl : public AbstractFunction
7171
{
7272
public:
7373
explicit ArgumentsTreeNode(const QueryTreeNodes * arguments_) : arguments(arguments_) {}
74-
size_t size() const override { return arguments ? arguments->size() : 0; }
75-
std::unique_ptr<Argument> at(size_t n) const override { return std::make_unique<ArgumentTreeNode>(arguments->at(n).get()); }
74+
size_t size() const override
75+
{ /// size withous skipped indexes
76+
return arguments ? arguments->size() - skippedSize() : 0;
77+
}
78+
std::unique_ptr<Argument> at(size_t n) const override
79+
{ /// n is relative index, some can be skipped
80+
return std::make_unique<ArgumentTreeNode>(arguments->at(getRealIndex(n)).get());
81+
}
7682
private:
7783
const QueryTreeNodes * arguments = nullptr;
7884
};

src/Core/Settings.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6970,6 +6970,15 @@ Enable PRQL - an alternative to SQL.
69706970
)", EXPERIMENTAL) \
69716971
DECLARE(Bool, enable_adaptive_memory_spill_scheduler, false, R"(
69726972
Trigger processor to spill data into external storage adpatively. grace join is supported at present.
6973+
)", EXPERIMENTAL) \
6974+
DECLARE(String, object_storage_cluster, "", R"(
6975+
Cluster to make distributed requests to object storages with alternative syntax.
6976+
)", EXPERIMENTAL) \
6977+
DECLARE(UInt64, object_storage_max_nodes, 0, R"(
6978+
Limit for hosts used for request in object storage cluster table functions - azureBlobStorageCluster, s3Cluster, hdfsCluster, etc.
6979+
Possible values:
6980+
- Positive integer.
6981+
- 0 — All hosts in cluster.
69736982
)", EXPERIMENTAL) \
69746983
DECLARE(Bool, allow_experimental_delta_kernel_rs, true, R"(
69756984
Allow experimental delta-kernel-rs implementation.

src/Core/SettingsChangesHistory.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
3939
/// controls new feature and it's 'true' by default, use 'false' as previous_value).
4040
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
4141
/// Note: please check if the key already exists to prevent duplicate entries.
42+
addSettingsChanges(settings_changes_history, "25.8.9.2000",
43+
{
44+
{"object_storage_cluster", "", "", "Antalya: New setting"},
45+
{"object_storage_max_nodes", 0, 0, "Antalya: New setting"},
46+
});
4247
addSettingsChanges(settings_changes_history, "25.8",
4348
{
4449
{"output_format_json_quote_64bit_integers", true, false, "Disable quoting of the 64 bit integers in JSON by default"},
@@ -133,6 +138,15 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
133138
{"allow_experimental_insert_into_iceberg", false, false, "New setting."},
134139
/// RELEASE CLOSED
135140
});
141+
addSettingsChanges(settings_changes_history, "25.6.5.2000",
142+
{
143+
{"allow_experimental_database_iceberg", false, true, "Turned ON by default for Antalya"},
144+
{"allow_experimental_database_unity_catalog", false, true, "Turned ON by default for Antalya"},
145+
{"allow_experimental_database_glue_catalog", false, true, "Turned ON by default for Antalya"},
146+
{"output_format_parquet_enum_as_byte_array", true, true, "Enable writing Enum as byte array in Parquet by default"},
147+
{"object_storage_cluster", "", "", "New setting"},
148+
{"object_storage_max_nodes", 0, 0, "New setting"},
149+
});
136150
addSettingsChanges(settings_changes_history, "25.6",
137151
{
138152
/// RELEASE CLOSED

src/Databases/DataLake/DatabaseDataLake.cpp

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <Storages/ConstraintsDescription.h>
2323
#include <Storages/StorageNull.h>
2424
#include <Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h>
25+
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>
2526

2627
#include <Interpreters/evaluateConstantExpression.h>
2728
#include <Interpreters/Context.h>
@@ -47,6 +48,7 @@ namespace DatabaseDataLakeSetting
4748
extern const DatabaseDataLakeSettingsString oauth_server_uri;
4849
extern const DatabaseDataLakeSettingsBool oauth_server_use_request_body;
4950
extern const DatabaseDataLakeSettingsBool vended_credentials;
51+
extern const DatabaseDataLakeSettingsString object_storage_cluster;
5052
extern const DatabaseDataLakeSettingsString aws_access_key_id;
5153
extern const DatabaseDataLakeSettingsString aws_secret_access_key;
5254
extern const DatabaseDataLakeSettingsString region;
@@ -176,7 +178,7 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
176178
return catalog_impl;
177179
}
178180

179-
std::shared_ptr<StorageObjectStorageConfiguration> DatabaseDataLake::getConfiguration(
181+
StorageObjectStorageConfigurationPtr DatabaseDataLake::getConfiguration(
180182
DatabaseDataLakeStorageType type,
181183
DataLakeStorageSettingsPtr storage_settings) const
182184
{
@@ -423,24 +425,25 @@ StoragePtr DatabaseDataLake::tryGetTableImpl(const String & name, ContextPtr con
423425

424426
/// with_table_structure = false: because there will be
425427
/// no table structure in table definition AST.
426-
StorageObjectStorageConfiguration::initialize(*configuration, args, context_copy, /* with_table_structure */false);
428+
configuration->initialize(args, context_copy, /* with_table_structure */false);
427429

428-
return std::make_shared<StorageObjectStorage>(
430+
auto cluster_name = settings[DatabaseDataLakeSetting::object_storage_cluster].value;
431+
432+
return std::make_shared<StorageObjectStorageCluster>(
433+
cluster_name,
429434
configuration,
430435
configuration->createObjectStorage(context_copy, /* is_readonly */ false),
431-
context_copy,
432436
StorageID(getDatabaseName(), name),
433437
/* columns */columns,
434438
/* constraints */ConstraintsDescription{},
439+
/* partition_by */nullptr,
440+
context_copy,
435441
/* comment */"",
436442
getFormatSettings(context_copy),
437443
LoadingStrictnessLevel::CREATE,
438444
getCatalog(),
439445
/* if_not_exists*/true,
440446
/* is_datalake_query*/true,
441-
/* distributed_processing */false,
442-
/* partition_by */nullptr,
443-
/* is_table_function */false,
444447
/* lazy_init */true);
445448
}
446449

src/Databases/DataLake/DatabaseDataLake.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ class DatabaseDataLake final : public IDatabase, WithContext
8888
void validateSettings();
8989
std::shared_ptr<DataLake::ICatalog> getCatalog() const;
9090

91-
std::shared_ptr<StorageObjectStorageConfiguration> getConfiguration(
91+
StorageObjectStorageConfigurationPtr getConfiguration(
9292
DatabaseDataLakeStorageType type,
9393
DataLakeStorageSettingsPtr storage_settings) const;
9494

src/Databases/DataLake/GlueCatalog.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ bool GlueCatalog::classifyTimestampTZ(const String & column_name, const TableMet
451451
auto storage_settings = std::make_shared<DB::DataLakeStorageSettings>();
452452
storage_settings->loadFromSettingsChanges(settings.allChanged());
453453
auto configuration = std::make_shared<DB::StorageS3IcebergConfiguration>(storage_settings);
454-
DB::StorageObjectStorageConfiguration::initialize(*configuration, args, getContext(), false);
454+
configuration->initialize(args, getContext(), false);
455455

456456
auto object_storage = configuration->createObjectStorage(getContext(), true);
457457
const auto & read_settings = getContext()->getReadSettings();

src/Disks/DiskType.cpp

Lines changed: 44 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ namespace ErrorCodes
1212
extern const int LOGICAL_ERROR;
1313
}
1414

15-
MetadataStorageType metadataTypeFromString(const String & type)
15+
MetadataStorageType metadataTypeFromString(const std::string & type)
1616
{
1717
auto check_type = Poco::toLower(type);
1818
if (check_type == "local")
@@ -60,25 +60,49 @@ std::string DataSourceDescription::toString() const
6060
case DataSourceType::RAM:
6161
return "memory";
6262
case DataSourceType::ObjectStorage:
63-
{
64-
switch (object_storage_type)
65-
{
66-
case ObjectStorageType::S3:
67-
return "s3";
68-
case ObjectStorageType::HDFS:
69-
return "hdfs";
70-
case ObjectStorageType::Azure:
71-
return "azure_blob_storage";
72-
case ObjectStorageType::Local:
73-
return "local_blob_storage";
74-
case ObjectStorageType::Web:
75-
return "web";
76-
case ObjectStorageType::None:
77-
return "none";
78-
case ObjectStorageType::Max:
79-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected object storage type: Max");
80-
}
81-
}
63+
return DB::toString(object_storage_type);
8264
}
8365
}
66+
67+
ObjectStorageType objectStorageTypeFromString(const std::string & type)
68+
{
69+
auto check_type = Poco::toLower(type);
70+
if (check_type == "s3")
71+
return ObjectStorageType::S3;
72+
if (check_type == "hdfs")
73+
return ObjectStorageType::HDFS;
74+
if (check_type == "azure_blob_storage" || check_type == "azure")
75+
return ObjectStorageType::Azure;
76+
if (check_type == "local_blob_storage" || check_type == "local")
77+
return ObjectStorageType::Local;
78+
if (check_type == "web")
79+
return ObjectStorageType::Web;
80+
if (check_type == "none")
81+
return ObjectStorageType::None;
82+
83+
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG,
84+
"Unknown object storage type: {}", type);
85+
}
86+
87+
std::string toString(ObjectStorageType type)
88+
{
89+
switch (type)
90+
{
91+
case ObjectStorageType::S3:
92+
return "s3";
93+
case ObjectStorageType::HDFS:
94+
return "hdfs";
95+
case ObjectStorageType::Azure:
96+
return "azure_blob_storage";
97+
case ObjectStorageType::Local:
98+
return "local_blob_storage";
99+
case ObjectStorageType::Web:
100+
return "web";
101+
case ObjectStorageType::None:
102+
return "none";
103+
case ObjectStorageType::Max:
104+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected object storage type: Max");
105+
}
106+
}
107+
84108
}

src/Disks/DiskType.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,10 @@ enum class MetadataStorageType : uint8_t
3636
Memory,
3737
};
3838

39-
MetadataStorageType metadataTypeFromString(const String & type);
40-
String toString(DataSourceType data_source_type);
39+
MetadataStorageType metadataTypeFromString(const std::string & type);
40+
41+
ObjectStorageType objectStorageTypeFromString(const std::string & type);
42+
std::string toString(ObjectStorageType type);
4143

4244
struct DataSourceDescription
4345
{

src/Interpreters/Cluster.cpp

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -732,9 +732,9 @@ void Cluster::initMisc()
732732
}
733733
}
734734

735-
std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard) const
735+
std::unique_ptr<Cluster> Cluster::getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts) const
736736
{
737-
return std::unique_ptr<Cluster>{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard)};
737+
return std::unique_ptr<Cluster>{ new Cluster(ReplicasAsShardsTag{}, *this, settings, max_replicas_from_shard, max_hosts)};
738738
}
739739

740740
std::unique_ptr<Cluster> Cluster::getClusterWithSingleShard(size_t index) const
@@ -783,7 +783,7 @@ void shuffleReplicas(std::vector<Cluster::Address> & replicas, const Settings &
783783

784784
}
785785

786-
Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard)
786+
Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts)
787787
{
788788
if (from.addresses_with_failover.empty())
789789
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster is empty");
@@ -805,6 +805,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
805805

806806
if (address.is_local)
807807
info.local_addresses.push_back(address);
808+
addresses_with_failover.emplace_back(Addresses({address}));
808809

809810
auto pool = ConnectionPoolFactory::instance().get(
810811
static_cast<unsigned>(settings[Setting::distributed_connections_pool_size]),
@@ -828,9 +829,6 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
828829
info.per_replica_pools = {std::move(pool)};
829830
info.default_database = address.default_database;
830831

831-
addresses_with_failover.emplace_back(Addresses{address});
832-
833-
slot_to_shard.insert(std::end(slot_to_shard), info.weight, shards_info.size());
834832
shards_info.emplace_back(std::move(info));
835833
}
836834
};
@@ -852,10 +850,37 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
852850
secret = from.secret;
853851
name = from.name;
854852

853+
constrainShardInfoAndAddressesToMaxHosts(max_hosts);
854+
855+
for (size_t i = 0; i < shards_info.size(); ++i)
856+
slot_to_shard.insert(std::end(slot_to_shard), shards_info[i].weight, i);
857+
855858
initMisc();
856859
}
857860

858861

862+
void Cluster::constrainShardInfoAndAddressesToMaxHosts(size_t max_hosts)
863+
{
864+
if (max_hosts == 0 || shards_info.size() <= max_hosts)
865+
return;
866+
867+
pcg64_fast gen{randomSeed()};
868+
std::shuffle(shards_info.begin(), shards_info.end(), gen);
869+
shards_info.resize(max_hosts);
870+
871+
AddressesWithFailover addresses_with_failover_;
872+
873+
UInt32 shard_num = 0;
874+
for (auto & shard_info : shards_info)
875+
{
876+
addresses_with_failover_.push_back(addresses_with_failover[shard_info.shard_num - 1]);
877+
shard_info.shard_num = ++shard_num;
878+
}
879+
880+
addresses_with_failover.swap(addresses_with_failover_);
881+
}
882+
883+
859884
Cluster::Cluster(Cluster::SubclusterTag, const Cluster & from, const std::vector<size_t> & indices)
860885
{
861886
for (size_t index : indices)

src/Interpreters/Cluster.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ class Cluster
270270
std::unique_ptr<Cluster> getClusterWithMultipleShards(const std::vector<size_t> & indices) const;
271271

272272
/// Get a new Cluster that contains all servers (all shards with all replicas) from existing cluster as independent shards.
273-
std::unique_ptr<Cluster> getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0) const;
273+
std::unique_ptr<Cluster> getClusterWithReplicasAsShards(const Settings & settings, size_t max_replicas_from_shard = 0, size_t max_hosts = 0) const;
274274

275275
/// Returns false if cluster configuration doesn't allow to use it for cross-replication.
276276
/// NOTE: true does not mean, that it's actually a cross-replication cluster.
@@ -296,7 +296,7 @@ class Cluster
296296

297297
/// For getClusterWithReplicasAsShards implementation
298298
struct ReplicasAsShardsTag {};
299-
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard);
299+
Cluster(ReplicasAsShardsTag, const Cluster & from, const Settings & settings, size_t max_replicas_from_shard, size_t max_hosts);
300300

301301
void addShard(
302302
const Settings & settings,
@@ -308,6 +308,9 @@ class Cluster
308308
ShardInfoInsertPathForInternalReplication insert_paths = {},
309309
bool internal_replication = false);
310310

311+
/// Reduce size of cluster to max_hosts
312+
void constrainShardInfoAndAddressesToMaxHosts(size_t max_hosts);
313+
311314
/// Inter-server secret
312315
String secret;
313316

0 commit comments

Comments
 (0)