Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion src/Analyzer/FunctionNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <DataTypes/DataTypeSet.h>

#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSetQuery.h>

#include <Functions/IFunction.h>

Expand Down Expand Up @@ -164,14 +165,21 @@ void FunctionNode::dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state
buffer << '\n' << std::string(indent + 2, ' ') << "WINDOW\n";
getWindowNode()->dumpTreeImpl(buffer, format_state, indent + 4);
}

if (!settings_changes.empty())
{
buffer << '\n' << std::string(indent + 2, ' ') << "SETTINGS";
for (const auto & change : settings_changes)
buffer << fmt::format(" {}={}", change.name, fieldToString(change.value));
}
}

bool FunctionNode::isEqualImpl(const IQueryTreeNode & rhs, CompareOptions compare_options) const
{
const auto & rhs_typed = assert_cast<const FunctionNode &>(rhs);
if (function_name != rhs_typed.function_name || isAggregateFunction() != rhs_typed.isAggregateFunction()
|| isOrdinaryFunction() != rhs_typed.isOrdinaryFunction() || isWindowFunction() != rhs_typed.isWindowFunction()
|| nulls_action != rhs_typed.nulls_action)
|| nulls_action != rhs_typed.nulls_action || settings_changes != rhs_typed.settings_changes)
return false;

/// is_operator is ignored here because it affects only AST formatting
Expand Down Expand Up @@ -206,6 +214,17 @@ void FunctionNode::updateTreeHashImpl(HashState & hash_state, CompareOptions com
hash_state.update(isWindowFunction());
hash_state.update(nulls_action);

hash_state.update(settings_changes.size());
for (const auto & change : settings_changes)
{
hash_state.update(change.name.size());
hash_state.update(change.name);

const auto & value_dump = change.value.dump();
hash_state.update(value_dump.size());
hash_state.update(value_dump);
}

/// is_operator is ignored here because it affects only AST formatting

if (!compare_options.compare_types)
Expand All @@ -230,6 +249,7 @@ QueryTreeNodePtr FunctionNode::cloneImpl() const
result_function->nulls_action = nulls_action;
result_function->wrap_with_nullable = wrap_with_nullable;
result_function->is_operator = is_operator;
result_function->settings_changes = settings_changes;

return result_function;
}
Expand Down Expand Up @@ -292,6 +312,14 @@ ASTPtr FunctionNode::toASTImpl(const ConvertToASTOptions & options) const
function_ast->window_definition = window_node->toAST(new_options);
}

if (!settings_changes.empty())
{
auto settings_ast = make_intrusive<ASTSetQuery>();
settings_ast->changes = settings_changes;
settings_ast->is_standalone = false;
function_ast->arguments->children.push_back(settings_ast);
}

return function_ast;
}

Expand Down
15 changes: 15 additions & 0 deletions src/Analyzer/FunctionNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <Functions/IFunction.h>
#include <Parsers/NullsAction.h>
#include <Common/typeid_cast.h>
#include <Common/SettingsChanges.h>

namespace DB
{
Expand Down Expand Up @@ -204,6 +205,18 @@ class FunctionNode final : public IQueryTreeNode
wrap_with_nullable = true;
}

/// Get settings changes passed to table function
const SettingsChanges & getSettingsChanges() const
{
return settings_changes;
}

/// Set settings changes passed as last argument to table function
void setSettingsChanges(SettingsChanges settings_changes_)
{
settings_changes = std::move(settings_changes_);
}

void dumpTreeImpl(WriteBuffer & buffer, FormatState & format_state, size_t indent) const override;

protected:
Expand All @@ -228,6 +241,8 @@ class FunctionNode final : public IQueryTreeNode
static constexpr size_t arguments_child_index = 1;
static constexpr size_t window_child_index = 2;
static constexpr size_t children_size = window_child_index + 1;

SettingsChanges settings_changes;
};

}
7 changes: 6 additions & 1 deletion src/Analyzer/QueryTreeBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,12 @@ QueryTreeNodePtr QueryTreeBuilder::buildExpression(const ASTPtr & expression, co
{
const auto & function_arguments_list = function->arguments->as<ASTExpressionList>()->children;
for (const auto & argument : function_arguments_list)
function_node->getArguments().getNodes().push_back(buildExpression(argument, context));
{
if (const auto * ast_set = argument->as<ASTSetQuery>())
function_node->setSettingsChanges(ast_set->changes);
else
function_node->getArguments().getNodes().push_back(buildExpression(argument, context));
}
}

if (function->is_window_function)
Expand Down
1 change: 1 addition & 0 deletions src/Analyzer/Resolve/QueryAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3861,6 +3861,7 @@ void QueryAnalyzer::resolveTableFunction(QueryTreeNodePtr & table_function_node,
{
auto table_function_node_to_resolve_typed = std::make_shared<TableFunctionNode>(table_function_argument_function_name);
table_function_node_to_resolve_typed->getArgumentsNode() = table_function_argument_function->getArgumentsNode();
table_function_node_to_resolve_typed->setSettingsChanges(table_function_argument_function->getSettingsChanges());

QueryTreeNodePtr table_function_node_to_resolve = std::move(table_function_node_to_resolve_typed);
if (table_function_argument_function_name == "view")
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7658,6 +7658,9 @@ Rewrite expressions like 'x IN subquery' to JOIN. This might be useful for optim
)", EXPERIMENTAL) \
DECLARE(Bool, object_storage_remote_initiator, false, R"(
Execute request to object storage as remote on one of object_storage_cluster nodes.
)", EXPERIMENTAL) \
DECLARE(String, object_storage_remote_initiator_cluster, "", R"(
Cluster to choose remote initiator, when `object_storage_remote_initiator` is true. When empty, `object_storage_cluster` is used.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"(
Allow Iceberg read optimization based on Iceberg metadata.
Expand Down
9 changes: 1 addition & 8 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
addSettingsChanges(settings_changes_history, "26.1.3.20001.altinityantalya",
{
{"iceberg_partition_timezone", "", "", "New setting."},
// {"object_storage_max_nodes", 0, 0, "Antalya: New setting"},
{"s3_propagate_credentials_to_other_storages", false, false, "New setting"},
{"export_merge_tree_part_filename_pattern", "", "{part_name}_{checksum}", "New setting"},
{"object_storage_remote_initiator_cluster", "", "", "New setting."},
{"iceberg_metadata_staleness_ms", 0, 0, "New setting allowing using cached metadata version at READ operations to prevent fetching from remote catalog"},
});
addSettingsChanges(settings_changes_history, "26.1",
Expand Down Expand Up @@ -247,26 +247,19 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
{"lock_object_storage_task_distribution_ms", 500, 500, "New setting."},
{"allow_retries_in_cluster_requests", false, false, "New setting"},
// {"object_storage_remote_initiator", false, false, "New setting."},
{"allow_experimental_export_merge_tree_part", false, true, "Turned ON by default for Antalya."},
{"export_merge_tree_part_overwrite_file_if_exists", false, false, "New setting."},
{"export_merge_tree_partition_force_export", false, false, "New setting."},
{"export_merge_tree_partition_max_retries", 3, 3, "New setting."},
{"export_merge_tree_partition_manifest_ttl", 180, 180, "New setting."},
{"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."},
// {"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."},
{"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."},
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
{"enable_alias_marker", true, true, "New setting."},
// {"input_format_parquet_use_native_reader_v3", false, true, "Seems stable"},
// {"input_format_parquet_verify_checksums", true, true, "New setting."},
// {"output_format_parquet_write_checksums", false, true, "New setting."},
{"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."},
{"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."},
{"export_merge_tree_partition_lock_inside_the_task", false, false, "New setting."},
{"export_merge_tree_partition_system_table_prefer_remote_information", true, true, "New setting."},
// {"cluster_table_function_split_granularity", "file", "file", "New setting."},
// {"cluster_table_function_buckets_batch_size", 0, 0, "New setting."},
{"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."},
{"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."},
{"object_storage_cluster", "", "", "Antalya: New setting"},
Expand Down
41 changes: 35 additions & 6 deletions src/Storages/IStorageCluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ namespace Setting
extern const SettingsBool async_query_sending_for_remote;
extern const SettingsBool async_socket_for_remote;
extern const SettingsBool skip_unavailable_shards;
extern const SettingsBool parallel_replicas_local_plan;
extern const SettingsString cluster_for_parallel_replicas;
extern const SettingsNonZeroUInt64 max_parallel_replicas;
extern const SettingsUInt64 object_storage_max_nodes;
extern const SettingsBool object_storage_remote_initiator;
extern const SettingsString object_storage_remote_initiator_cluster;
extern const SettingsObjectStorageClusterJoinMode object_storage_cluster_join_mode;
}

Expand Down Expand Up @@ -330,8 +329,6 @@ void IStorageCluster::read(

const auto & settings = context->getSettingsRef();

auto cluster = getClusterImpl(context, cluster_name_from_settings, isObjectStorage() ? settings[Setting::object_storage_max_nodes] : 0);

/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)

SharedHeader sample_block;
Expand All @@ -352,9 +349,21 @@ void IStorageCluster::read(

updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context);

/// In case the current node is not supposed to initiate the clustered query
/// Sends this query to a remote initiator using the `remote` table function
if (settings[Setting::object_storage_remote_initiator])
{
auto storage_and_context = convertToRemote(cluster, context, cluster_name_from_settings, query_to_send);
/// Re-writes queries in the form of:
/// Input: SELECT * FROM iceberg(...) SETTINGS object_storage_cluster='swarm', object_storage_remote_initiator=1
/// Output: SELECT * FROM remote('remote_host', icebergCluster('swarm', ...)
/// Where `remote_host` is a random host from the cluster which will execute the query
/// This means the initiator node belongs to the same cluster that will execute the query
/// In case remote_initiator_cluster_name is set, the initiator might be set to a different cluster
auto remote_initiator_cluster_name = settings[Setting::object_storage_remote_initiator_cluster].value;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment explaining what this code block does. It took me a while to understand it by just reading the code.

I suggest something like:

/// In case the current node is not supposed to initiate the clustered query
/// Sends this query to a remote initiator using the `remote` table function
if (settings[Setting::object_storage_remote_initiator])
{
      /// Re-writes queries in the form of:
      /// Input: SELECT * FROM iceberg(...) SETTINGS object_storage_cluster='swarm', object_storage_remote_initiator=1
     /// Output: SELECT * FROM remote('remote_host', icebergCluster('swarm', ...)
     /// Where `remote_host` is a random host from the cluster which will execute the query
     /// This means the initiator node belongs to the same cluster that will execute the query
     /// In case remote_initiator_cluster_name is set, the initiator might be set to a different cluster
}

if (remote_initiator_cluster_name.empty())
remote_initiator_cluster_name = cluster_name_from_settings;
auto remote_initiator_cluster = getClusterImpl(context, remote_initiator_cluster_name);
auto storage_and_context = convertToRemote(remote_initiator_cluster, context, remote_initiator_cluster_name, query_to_send);
Comment on lines +362 to +366
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Strip initiator-only cluster setting before forwarding query

When object_storage_remote_initiator_cluster is set, the query forwarded via remote(...) still carries that setting even though convertToRemote only clears object_storage_remote_initiator. This makes remote execution depend on remote nodes understanding a setting that is only needed on the initiator; in mixed-version/rolling-upgrade clusters, older remote hosts can fail with unknown-setting errors before execution. The forwarded AST settings should drop object_storage_remote_initiator_cluster together with object_storage_remote_initiator.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ianton-ru does it make sense? it looks like it does

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

object_storage_remote_initiator_cluster does nothing without ``object_storage_remote_initiator`. Make sense to remove just for less garbage in sub-query.

auto src_distributed = std::dynamic_pointer_cast<StorageDistributed>(storage_and_context.storage);
auto modified_query_info = query_info;
modified_query_info.cluster = src_distributed->getCluster();
Expand All @@ -363,6 +372,8 @@ void IStorageCluster::read(
return;
}

auto cluster = getClusterImpl(context, cluster_name_from_settings, isObjectStorage() ? settings[Setting::object_storage_max_nodes] : 0);

RestoreQualifiedNamesVisitor::Data data;
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_to_send->as<ASTSelectQuery &>(), 0));
data.remote_table.database = context->getCurrentDatabase();
Expand Down Expand Up @@ -396,6 +407,10 @@ IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote(
const std::string & cluster_name_from_settings,
ASTPtr query_to_send)
{
/// TODO: Allow to use secret for remote queries
if (!cluster->getSecret().empty())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Can't convert query to remote when cluster uses secret");

auto host_addresses = cluster->getShardsAddresses();
if (host_addresses.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings);
Expand All @@ -417,6 +432,7 @@ IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote(
/// Clean object_storage_remote_initiator setting to avoid infinite remote call
auto new_context = Context::createCopy(context);
new_context->setSetting("object_storage_remote_initiator", false);
new_context->setSetting("object_storage_remote_initiator_cluster", false);

auto * select_query = query_to_send->as<ASTSelectQuery>();
if (!select_query)
Expand All @@ -436,7 +452,20 @@ IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote(
if (!table_expression)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression");

auto remote_query = makeASTFunction(remote_function_name, make_intrusive<ASTLiteral>(host_name), table_expression->table_function);
boost::intrusive_ptr<ASTFunction> remote_query;

if (shard_addresses[0].user_specified)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment please

{ // with user/password for clsuter access remote query is executed from this user, add it in query parameters
remote_query = makeASTFunction(remote_function_name,
make_intrusive<ASTLiteral>(host_name),
table_expression->table_function,
make_intrusive<ASTLiteral>(shard_addresses[0].user),
make_intrusive<ASTLiteral>(shard_addresses[0].password));
}
else
{ // without specified user/password remote query is executed from default user
remote_query = makeASTFunction(remote_function_name, make_intrusive<ASTLiteral>(host_name), table_expression->table_function);
}

table_expression->table_function = remote_query;

Expand Down
1 change: 1 addition & 0 deletions src/Storages/StorageDistributed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,7 @@ QueryTreeNodePtr buildQueryTreeDistributed(SelectQueryInfo & query_info,

auto table_function_node = std::make_shared<TableFunctionNode>(remote_table_function_node.getFunctionName());
table_function_node->getArgumentsNode() = remote_table_function_node.getArgumentsNode();
table_function_node->setSettingsChanges(remote_table_function_node.getSettingsChanges());

if (table_expression_modifiers)
table_function_node->setTableExpressionModifiers(*table_expression_modifiers);
Expand Down
8 changes: 5 additions & 3 deletions src/TableFunctions/ITableFunctionCluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ class ITableFunctionCluster : public Base

/// Cluster name is always the first
cluster_name = checkAndGetLiteralArgument<String>(args[0], "cluster_name");

if (!context->tryGetCluster(cluster_name))
throw Exception(ErrorCodes::CLUSTER_DOESNT_EXIST, "Requested cluster '{}' not found", cluster_name);
/// Remove check cluster existing here
/// In query like
/// remote('remote_host', xxxCluster('remote_cluster', ...))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the query is not remote? Can't we check for that?

/// If cluster not exists, query falls later

Where and with which exception? It would be good to avoid any network calls before failing

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cluster name is not a network node name, it's an internal ClickHouse name. Query falls later when tries to get hosts from cluster. Network calls can't be made without hosts.

But it's hard to understand here is cluster function inside 'remote' or not.

/// 'remote_cluster' can be defined only on 'remote_host'
/// If cluster not exists, query falls later

/// Just cut the first arg (cluster_name) and try to parse other table function arguments as is
args.erase(args.begin());
Expand Down
33 changes: 33 additions & 0 deletions tests/integration/test_s3_cluster/configs/cluster.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,39 @@
</shard>
</cluster_with_dots>

<cluster_with_username_and_password>
<shard>
<replica>
<host>s0_0_1</host>
<port>9000</port>
<user>foo</user>
<password>bar</password>
</replica>
<replica>
<host>s0_1_0</host>
<port>9000</port>
<user>foo</user>
<password>bar</password>
</replica>
</shard>
</cluster_with_username_and_password>

<cluster_with_secret>
<secret>baz</secret>
<shard>
<replica>
<host>s0_0_1</host>
<port>9000</port>
<user>foo</user>
</replica>
<replica>
<host>s0_1_0</host>
<port>9000</port>
<user>foo</user>
</replica>
</shard>
</cluster_with_secret>

<cluster_all>
<shard>
<replica>
Expand Down
20 changes: 20 additions & 0 deletions tests/integration/test_s3_cluster/configs/hidden_clusters.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<clickhouse>
<remote_servers>
<hidden_cluster_with_username_and_password>
<shard>
<replica>
<host>s0_0_1</host>
<port>9000</port>
<user>foo</user>
<password>bar</password>
</replica>
<replica>
<host>s0_1_0</host>
<port>9000</port>
<user>foo</user>
<password>bar</password>
</replica>
</shard>
</hidden_cluster_with_username_and_password>
</remote_servers>
</clickhouse>
4 changes: 4 additions & 0 deletions tests/integration/test_s3_cluster/configs/users.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,9 @@
<profile>default</profile>
<named_collection_control>1</named_collection_control>
</default>
<foo>
<password>bar</password>
<profile>default</profile>
</foo>
</users>
</clickhouse>
Loading
Loading