Skip to content

Commit 06e34b3

Browse files
Enmkianton-ru
authored andcommitted
Merge pull request #950 from Altinity/feature/antalya-25.6.5/fix_remote_calls
25.6.5 Antalya port #583, #584, #703, #720 - fixes for s3Cluster distributed calls
1 parent 61c1d5c commit 06e34b3

File tree

19 files changed

+444
-52
lines changed

19 files changed

+444
-52
lines changed

src/Interpreters/ClusterProxy/executeQuery.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,7 @@ void executeQuery(
464464
not_optimized_cluster->getName());
465465

466466
read_from_remote->setStepDescription("Read from remote replica");
467+
read_from_remote->setIsRemoteFunction(is_remote_function);
467468
plan->addStep(std::move(read_from_remote));
468469
plan->addInterpreterContext(new_context);
469470
plans.emplace_back(std::move(plan));

src/Interpreters/Context.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2978,8 +2978,11 @@ void Context::setCurrentQueryId(const String & query_id)
29782978

29792979
client_info.current_query_id = query_id_to_set;
29802980

2981-
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
2981+
if (client_info.query_kind == ClientInfo::QueryKind::INITIAL_QUERY
2982+
&& (getApplicationType() != ApplicationType::SERVER || client_info.initial_query_id.empty()))
2983+
{
29822984
client_info.initial_query_id = client_info.current_query_id;
2985+
}
29832986
}
29842987

29852988
void Context::setBackgroundOperationTypeForContext(ClientInfo::BackgroundOperationType background_operation)

src/Interpreters/InterpreterSelectQuery.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
#include <Processors/QueryPlan/TotalsHavingStep.h>
7272
#include <Processors/QueryPlan/WindowStep.h>
7373
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
74+
#include <Processors/QueryPlan/ObjectFilterStep.h>
7475
#include <Processors/Sources/NullSource.h>
7576
#include <Processors/Sources/SourceFromSingleChunk.h>
7677
#include <Processors/Transforms/AggregatingTransform.h>
@@ -83,6 +84,7 @@
8384
#include <Storages/StorageValues.h>
8485
#include <Storages/StorageView.h>
8586
#include <Storages/ReadInOrderOptimizer.h>
87+
#include <Storages/IStorageCluster.h>
8688

8789
#include <Columns/Collator.h>
8890
#include <Columns/ColumnAggregateFunction.h>
@@ -195,6 +197,7 @@ namespace Setting
195197
extern const SettingsUInt64 max_rows_to_transfer;
196198
extern const SettingsOverflowMode transfer_overflow_mode;
197199
extern const SettingsString implicit_table_at_top_level;
200+
extern const SettingsBool use_hive_partitioning;
198201
}
199202

200203
namespace ServerSetting
@@ -1976,6 +1979,22 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
19761979

19771980
if (expressions.second_stage || from_aggregation_stage)
19781981
{
1982+
if (settings[Setting::use_hive_partitioning]
1983+
&& !expressions.first_stage
1984+
&& expressions.hasWhere())
1985+
{
1986+
if (typeid_cast<ReadFromCluster *>(query_plan.getRootNode()->step.get()))
1987+
{
1988+
auto object_filter_step = std::make_unique<ObjectFilterStep>(
1989+
query_plan.getCurrentHeader(),
1990+
expressions.before_where->dag.clone(),
1991+
getSelectQuery().where()->getColumnName());
1992+
1993+
object_filter_step->setStepDescription("WHERE");
1994+
query_plan.addStep(std::move(object_filter_step));
1995+
}
1996+
}
1997+
19791998
if (from_aggregation_stage)
19801999
{
19812000
/// No need to aggregate anything, since this was done on remote shards.

src/Planner/Planner.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include <Processors/QueryPlan/WindowStep.h>
4040
#include <Processors/QueryPlan/ReadNothingStep.h>
4141
#include <Processors/QueryPlan/ReadFromRecursiveCTEStep.h>
42+
#include <Processors/QueryPlan/ObjectFilterStep.h>
4243
#include <QueryPipeline/QueryPipelineBuilder.h>
4344

4445
#include <Interpreters/Context.h>
@@ -53,6 +54,7 @@
5354
#include <Storages/StorageDummy.h>
5455
#include <Storages/StorageMerge.h>
5556
#include <Storages/ObjectStorage/StorageObjectStorageCluster.h>
57+
#include <Storages/IStorageCluster.h>
5658

5759
#include <AggregateFunctions/IAggregateFunction.h>
5860

@@ -144,6 +146,7 @@ namespace Setting
144146
extern const SettingsUInt64 max_rows_to_transfer;
145147
extern const SettingsOverflowMode transfer_overflow_mode;
146148
extern const SettingsBool enable_parallel_blocks_marshalling;
149+
extern const SettingsBool use_hive_partitioning;
147150
}
148151

149152
namespace ServerSetting
@@ -473,6 +476,19 @@ void addFilterStep(
473476
query_plan.addStep(std::move(where_step));
474477
}
475478

479+
void addObjectFilterStep(QueryPlan & query_plan,
480+
FilterAnalysisResult & filter_analysis_result,
481+
const std::string & step_description)
482+
{
483+
auto actions = std::move(filter_analysis_result.filter_actions->dag);
484+
485+
auto where_step = std::make_unique<ObjectFilterStep>(query_plan.getCurrentHeader(),
486+
std::move(actions),
487+
filter_analysis_result.filter_column_name);
488+
where_step->setStepDescription(step_description);
489+
query_plan.addStep(std::move(where_step));
490+
}
491+
476492
Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context,
477493
const AggregationAnalysisResult & aggregation_analysis_result,
478494
const QueryAnalysisResult & query_analysis_result,
@@ -1787,6 +1803,16 @@ void Planner::buildPlanForQueryNode()
17871803

17881804
if (query_processing_info.isSecondStage() || query_processing_info.isFromAggregationState())
17891805
{
1806+
if (settings[Setting::use_hive_partitioning]
1807+
&& !query_processing_info.isFirstStage()
1808+
&& expression_analysis_result.hasWhere())
1809+
{
1810+
if (typeid_cast<ReadFromCluster *>(query_plan.getRootNode()->step.get()))
1811+
{
1812+
addObjectFilterStep(query_plan, expression_analysis_result.getWhere(), "WHERE");
1813+
}
1814+
}
1815+
17901816
if (query_processing_info.isFromAggregationState())
17911817
{
17921818
/// Aggregation was performed on remote shards
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#include <Processors/QueryPlan/ObjectFilterStep.h>
2+
#include <Processors/QueryPlan/QueryPlanStepRegistry.h>
3+
#include <Processors/QueryPlan/Serialization.h>
4+
#include <Processors/Transforms/FilterTransform.h>
5+
#include <IO/Operators.h>
6+
7+
#include <memory>
8+
9+
namespace DB
10+
{
11+
12+
namespace ErrorCodes
13+
{
14+
extern const int INCORRECT_DATA;
15+
}
16+
17+
ObjectFilterStep::ObjectFilterStep(
18+
SharedHeader input_header_,
19+
ActionsDAG actions_dag_,
20+
String filter_column_name_)
21+
: actions_dag(std::move(actions_dag_))
22+
, filter_column_name(std::move(filter_column_name_))
23+
{
24+
input_headers.emplace_back(input_header_);
25+
output_header = input_headers.front();
26+
}
27+
28+
QueryPipelineBuilderPtr ObjectFilterStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & /* settings */)
29+
{
30+
return std::move(pipelines.front());
31+
}
32+
33+
void ObjectFilterStep::updateOutputHeader()
34+
{
35+
output_header = input_headers.front();
36+
}
37+
38+
void ObjectFilterStep::serialize(Serialization & ctx) const
39+
{
40+
writeStringBinary(filter_column_name, ctx.out);
41+
42+
actions_dag.serialize(ctx.out, ctx.registry);
43+
}
44+
45+
std::unique_ptr<IQueryPlanStep> ObjectFilterStep::deserialize(Deserialization & ctx)
46+
{
47+
if (ctx.input_headers.size() != 1)
48+
throw Exception(ErrorCodes::INCORRECT_DATA, "ObjectFilterStep must have one input stream");
49+
50+
String filter_column_name;
51+
readStringBinary(filter_column_name, ctx.in);
52+
53+
ActionsDAG actions_dag = ActionsDAG::deserialize(ctx.in, ctx.registry, ctx.context);
54+
55+
return std::make_unique<ObjectFilterStep>(ctx.input_headers.front(), std::move(actions_dag), std::move(filter_column_name));
56+
}
57+
58+
void registerObjectFilterStep(QueryPlanStepRegistry & registry)
59+
{
60+
registry.registerStep("ObjectFilter", ObjectFilterStep::deserialize);
61+
}
62+
63+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#pragma once
2+
#include <Processors/QueryPlan/IQueryPlanStep.h>
3+
#include <Interpreters/ActionsDAG.h>
4+
5+
namespace DB
6+
{
7+
8+
/// Implements WHERE operation.
9+
class ObjectFilterStep : public IQueryPlanStep
10+
{
11+
public:
12+
ObjectFilterStep(
13+
SharedHeader input_header_,
14+
ActionsDAG actions_dag_,
15+
String filter_column_name_);
16+
17+
String getName() const override { return "ObjectFilter"; }
18+
QueryPipelineBuilderPtr updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & settings) override;
19+
20+
const ActionsDAG & getExpression() const { return actions_dag; }
21+
ActionsDAG & getExpression() { return actions_dag; }
22+
const String & getFilterColumnName() const { return filter_column_name; }
23+
24+
void serialize(Serialization & ctx) const override;
25+
26+
static std::unique_ptr<IQueryPlanStep> deserialize(Deserialization & ctx);
27+
28+
private:
29+
void updateOutputHeader() override;
30+
31+
ActionsDAG actions_dag;
32+
String filter_column_name;
33+
};
34+
35+
}

src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <Processors/QueryPlan/FilterStep.h>
44
#include <Processors/QueryPlan/LimitStep.h>
55
#include <Processors/QueryPlan/SourceStepWithFilter.h>
6+
#include <Processors/QueryPlan/ObjectFilterStep.h>
67

78
namespace DB::QueryPlanOptimizations
89
{
@@ -41,6 +42,10 @@ void optimizePrimaryKeyConditionAndLimit(const Stack & stack)
4142
/// So this is likely not needed.
4243
continue;
4344
}
45+
else if (auto * object_filter_step = typeid_cast<ObjectFilterStep *>(iter->node->step.get()))
46+
{
47+
source_step_with_filter->addFilter(object_filter_step->getExpression().clone(), object_filter_step->getFilterColumnName());
48+
}
4449
else
4550
{
4651
break;

src/Processors/QueryPlan/QueryPlanStepRegistry.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ void registerFilterStep(QueryPlanStepRegistry & registry);
5050
void registerTotalsHavingStep(QueryPlanStepRegistry & registry);
5151
void registerExtremesStep(QueryPlanStepRegistry & registry);
5252
void registerJoinStep(QueryPlanStepRegistry & registry);
53+
void registerObjectFilterStep(QueryPlanStepRegistry & registry);
5354

5455
void registerReadFromTableStep(QueryPlanStepRegistry & registry);
5556
void registerReadFromTableFunctionStep(QueryPlanStepRegistry & registry);
@@ -75,6 +76,7 @@ void QueryPlanStepRegistry::registerPlanSteps()
7576

7677
registerReadFromTableStep(registry);
7778
registerReadFromTableFunctionStep(registry);
79+
registerObjectFilterStep(registry);
7880
}
7981

8082
}

src/Processors/QueryPlan/ReadFromRemote.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,8 @@ void ReadFromRemote::addLazyPipe(
508508
my_stage = stage, my_storage = storage,
509509
add_agg_info, add_totals, add_extremes, async_read, async_query_sending,
510510
query_tree = shard.query_tree, planner_context = shard.planner_context,
511-
pushed_down_filters, parallel_marshalling_threads]() mutable
511+
pushed_down_filters, parallel_marshalling_threads,
512+
my_is_remote_function = is_remote_function]() mutable
512513
-> QueryPipelineBuilder
513514
{
514515
auto current_settings = my_context->getSettingsRef();
@@ -603,6 +604,8 @@ void ReadFromRemote::addLazyPipe(
603604
{DataTypeUInt32().createColumnConst(1, my_shard.shard_info.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
604605
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
605606
std::move(connections), query_string, header, my_context, my_throttler, my_scalars, my_external_tables, stage_to_use, my_shard.query_plan);
607+
remote_query_executor->setRemoteFunction(my_is_remote_function);
608+
remote_query_executor->setShardCount(my_shard_count);
606609

607610
auto pipe = createRemoteSourcePipe(
608611
remote_query_executor, add_agg_info, add_totals, add_extremes, async_read, async_query_sending, parallel_marshalling_threads);
@@ -693,6 +696,8 @@ void ReadFromRemote::addPipe(
693696
priority_func);
694697
remote_query_executor->setLogger(log);
695698
remote_query_executor->setPoolMode(PoolMode::GET_ONE);
699+
remote_query_executor->setRemoteFunction(is_remote_function);
700+
remote_query_executor->setShardCount(shard_count);
696701

697702
if (!table_func_ptr)
698703
remote_query_executor->setMainTable(shard.main_table ? shard.main_table : main_table);
@@ -713,6 +718,8 @@ void ReadFromRemote::addPipe(
713718
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
714719
shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage_to_use, shard.query_plan);
715720
remote_query_executor->setLogger(log);
721+
remote_query_executor->setRemoteFunction(is_remote_function);
722+
remote_query_executor->setShardCount(shard_count);
716723

717724
if (context->canUseTaskBasedParallelReplicas() || parallel_replicas_disabled)
718725
{

src/Processors/QueryPlan/ReadFromRemote.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase
4545

4646
void enableMemoryBoundMerging();
4747
void enforceAggregationInOrder(const SortDescription & sort_description);
48+
void setIsRemoteFunction(bool is_remote_function_ = true) { is_remote_function = is_remote_function_; }
4849

4950
bool hasSerializedPlan() const;
5051

@@ -62,6 +63,7 @@ class ReadFromRemote final : public SourceStepWithFilterBase
6263
UInt32 shard_count;
6364
const String cluster_name;
6465
std::optional<GetPriorityForLoadBalancing> priority_func_factory;
66+
bool is_remote_function = false;
6567

6668
Pipes addPipes(const ClusterProxy::SelectStreamFactory::Shards & used_shards, const SharedHeader & out_header);
6769

0 commit comments

Comments
 (0)