Skip to content

Commit 1dc8e15

Browse files
authored
Merge pull request #1052 from Altinity/frontport/antalya-25.8/fix_remote_calls
25.8 Antalya port: fixes for s3Cluster distributed calls
2 parents 2e84498 + 515cf56 commit 1dc8e15

27 files changed

+834
-60
lines changed

src/Core/Settings.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1760,6 +1760,22 @@ Possible values:
17601760
- `global` — Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.`
17611761
- `allow` — Allows the use of these types of subqueries.
17621762
)", IMPORTANT) \
1763+
DECLARE(ObjectStorageClusterJoinMode, object_storage_cluster_join_mode, ObjectStorageClusterJoinMode::ALLOW, R"(
1764+
Changes the behaviour of object storage cluster function or table.
1765+
1766+
ClickHouse applies this setting when the query contains the product of object storage cluster function ot table, i.e. when the query for a object storage cluster function ot table contains a non-GLOBAL subquery for the object storage cluster function ot table.
1767+
1768+
Restrictions:
1769+
1770+
- Only applied for JOIN subqueries.
1771+
- Only if the FROM section uses a object storage cluster function ot table.
1772+
1773+
Possible values:
1774+
1775+
- `local` — Replaces the database and table in the subquery with local ones for the destination server (shard), leaving the normal `IN`/`JOIN.`
1776+
- `global` — Unsupported for now. Replaces the `IN`/`JOIN` query with `GLOBAL IN`/`GLOBAL JOIN.`
1777+
- `allow` — Default value. Allows the use of these types of subqueries.
1778+
)", 0) \
17631779
\
17641780
DECLARE(UInt64, max_concurrent_queries_for_all_users, 0, R"(
17651781
Throw exception if the value of this setting is less or equal than the current number of simultaneously processed queries.

src/Core/Settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class WriteBuffer;
5858
M(CLASS_NAME, DistributedCachePoolBehaviourOnLimit) /* Cloud only */ \
5959
M(CLASS_NAME, DistributedDDLOutputMode) \
6060
M(CLASS_NAME, DistributedProductMode) \
61+
M(CLASS_NAME, ObjectStorageClusterJoinMode) \
6162
M(CLASS_NAME, Double) \
6263
M(CLASS_NAME, EscapingRule) \
6364
M(CLASS_NAME, Float) \

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
4141
/// Note: please check if the key already exists to prevent duplicate entries.
4242
addSettingsChanges(settings_changes_history, "25.8.9.2000",
4343
{
44+
{"object_storage_cluster_join_mode", "allow", "allow", "New setting"},
4445
{"lock_object_storage_task_distribution_ms", 500, 500, "Raised the value to 500 to avoid hoping tasks between executors."},
4546
{"object_storage_cluster", "", "", "Antalya: New setting"},
4647
{"object_storage_max_nodes", 0, 0, "Antalya: New setting"},

src/Core/SettingsEnums.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ IMPLEMENT_SETTING_ENUM(DistributedProductMode, ErrorCodes::UNKNOWN_DISTRIBUTED_P
9090
{"global", DistributedProductMode::GLOBAL},
9191
{"allow", DistributedProductMode::ALLOW}})
9292

93+
IMPLEMENT_SETTING_ENUM(ObjectStorageClusterJoinMode, ErrorCodes::BAD_ARGUMENTS,
94+
{{"local", ObjectStorageClusterJoinMode::LOCAL},
95+
{"global", ObjectStorageClusterJoinMode::GLOBAL},
96+
{"allow", ObjectStorageClusterJoinMode::ALLOW}})
97+
9398

9499
IMPLEMENT_SETTING_ENUM(QueryResultCacheNondeterministicFunctionHandling, ErrorCodes::BAD_ARGUMENTS,
95100
{{"throw", QueryResultCacheNondeterministicFunctionHandling::Throw},

src/Core/SettingsEnums.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,16 @@ enum class DistributedProductMode : uint8_t
163163

164164
DECLARE_SETTING_ENUM(DistributedProductMode)
165165

166+
/// The setting for executing object storage cluster function ot table JOIN sections.
167+
enum class ObjectStorageClusterJoinMode : uint8_t
168+
{
169+
LOCAL, /// Convert to local query
170+
GLOBAL, /// Convert to global query
171+
ALLOW /// Enable
172+
};
173+
174+
DECLARE_SETTING_ENUM(ObjectStorageClusterJoinMode)
175+
166176
/// How the query result cache handles queries with non-deterministic functions, e.g. now()
167177
enum class QueryResultCacheNondeterministicFunctionHandling : uint8_t
168178
{

src/Interpreters/ClusterProxy/executeQuery.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,7 @@ void executeQuery(
464464
not_optimized_cluster->getName());
465465

466466
read_from_remote->setStepDescription("Read from remote replica");
467+
read_from_remote->setIsRemoteFunction(is_remote_function);
467468
plan->addStep(std::move(read_from_remote));
468469
plan->addInterpreterContext(new_context);
469470
plans.emplace_back(std::move(plan));

src/Interpreters/Context.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2982,8 +2982,11 @@ void Context::setCurrentQueryId(const String & query_id)
29822982

29832983
client_info.current_query_id = query_id_to_set;
29842984

2985-
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
2985+
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY
2986+
&& (getApplicationType() != ApplicationType::SERVER || client_info.initial_query_id.empty()))
2987+
{
29862988
client_info.initial_query_id = client_info.current_query_id;
2989+
}
29872990
}
29882991

29892992
void Context::setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType background_operation)

src/Interpreters/InterpreterSelectQuery.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
#include <Processors/QueryPlan/TotalsHavingStep.h>
7272
#include <Processors/QueryPlan/WindowStep.h>
7373
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
74+
#include <Processors/QueryPlan/ObjectFilterStep.h>
7475
#include <Processors/Sources/NullSource.h>
7576
#include <Processors/Sources/SourceFromSingleChunk.h>
7677
#include <Processors/Transforms/AggregatingTransform.h>
@@ -83,6 +84,7 @@
8384
#include <Storages/StorageValues.h>
8485
#include <Storages/StorageView.h>
8586
#include <Storages/ReadInOrderOptimizer.h>
87+
#include <Storages/IStorageCluster.h>
8688

8789
#include <Columns/Collator.h>
8890
#include <Columns/ColumnAggregateFunction.h>
@@ -195,6 +197,7 @@ namespace Setting
195197
extern const SettingsUInt64 max_rows_to_transfer;
196198
extern const SettingsOverflowMode transfer_overflow_mode;
197199
extern const SettingsString implicit_table_at_top_level;
200+
extern const SettingsBool use_hive_partitioning;
198201
}
199202

200203
namespace ServerSetting
@@ -1976,6 +1979,22 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
19761979

19771980
if (expressions.second_stage || from_aggregation_stage)
19781981
{
1982+
if (settings[Setting::use_hive_partitioning]
1983+
&& !expressions.first_stage
1984+
&& expressions.hasWhere())
1985+
{
1986+
if (typeid_cast<ReadFromCluster *>(query_plan.getRootNode()->step.get()))
1987+
{
1988+
auto object_filter_step = std::make_unique<ObjectFilterStep>(
1989+
query_plan.getCurrentHeader(),
1990+
expressions.before_where->dag.clone(),
1991+
getSelectQuery().where()->getColumnName());
1992+
1993+
object_filter_step->setStepDescription("WHERE");
1994+
query_plan.addStep(std::move(object_filter_step));
1995+
}
1996+
}
1997+
19791998
if (from_aggregation_stage)
19801999
{
19812000
/// No need to aggregate anything, since this was done on remote shards.

src/Planner/Planner.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include <Processors/QueryPlan/WindowStep.h>
4040
#include <Processors/QueryPlan/ReadNothingStep.h>
4141
#include <Processors/QueryPlan/ReadFromRecursiveCTEStep.h>
42+
#include <Processors/QueryPlan/ObjectFilterStep.h>
4243
#include <QueryPipeline/QueryPipelineBuilder.h>
4344

4445
#include <Interpreters/Context.h>
@@ -53,6 +54,7 @@
5354
#include <Storages/StorageDummy.h>
5455
#include <Storages/StorageMerge.h>
5556
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>
57+
#include <Storages/IStorageCluster.h>
5658

5759
#include <AggregateFunctions/IAggregateFunction.h>
5860

@@ -144,6 +146,7 @@ namespace Setting
144146
extern const SettingsUInt64 max_rows_to_transfer;
145147
extern const SettingsOverflowMode transfer_overflow_mode;
146148
extern const SettingsBool enable_parallel_blocks_marshalling;
149+
extern const SettingsBool use_hive_partitioning;
147150
}
148151

149152
namespace ServerSetting
@@ -473,6 +476,19 @@ void addFilterStep(
473476
query_plan.addStep(std::move(where_step));
474477
}
475478

479+
void addObjectFilterStep(QueryPlan & query_plan,
480+
FilterAnalysisResult & filter_analysis_result,
481+
const std::string & step_description)
482+
{
483+
auto actions = std::move(filter_analysis_result.filter_actions->dag);
484+
485+
auto where_step = std::make_unique<ObjectFilterStep>(query_plan.getCurrentHeader(),
486+
std::move(actions),
487+
filter_analysis_result.filter_column_name);
488+
where_step->setStepDescription(step_description);
489+
query_plan.addStep(std::move(where_step));
490+
}
491+
476492
Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context,
477493
const AggregationAnalysisResult & aggregation_analysis_result,
478494
const QueryAnalysisResult & query_analysis_result,
@@ -1787,6 +1803,16 @@ void Planner::buildPlanForQueryNode()
17871803

17881804
if (query_processing_info.isSecondStage() || query_processing_info.isFromAggregationState())
17891805
{
1806+
if (settings[Setting::use_hive_partitioning]
1807+
&& !query_processing_info.isFirstStage()
1808+
&& expression_analysis_result.hasWhere())
1809+
{
1810+
if (typeid_cast<ReadFromCluster *>(query_plan.getRootNode()->step.get()))
1811+
{
1812+
addObjectFilterStep(query_plan, expression_analysis_result.getWhere(), "WHERE");
1813+
}
1814+
}
1815+
17901816
if (query_processing_info.isFromAggregationState())
17911817
{
17921818
/// Aggregation was performed on remote shards

src/Planner/PlannerJoinTree.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1372,6 +1372,8 @@ JoinTreeQueryPlan buildQueryPlanForTableExpression(QueryTreeNodePtr table_expres
13721372
/// Hopefully there is no other case when we read from Distributed up to FetchColumns.
13731373
if (table_node && table_node->getStorage()->isRemote() && select_query_options.to_stage == QueryProcessingStage::FetchColumns)
13741374
updated_actions_dag_outputs.push_back(output_node);
1375+
else if (table_function_node && table_function_node->getStorage()->isRemote())
1376+
updated_actions_dag_outputs.push_back(output_node);
13751377
}
13761378
else
13771379
updated_actions_dag_outputs.push_back(&rename_actions_dag.addAlias(*output_node, *column_identifier));

0 commit comments

Comments
 (0)