Skip to content

Commit b28b8ed

Browse files
committed
move some logic for additional table functions deeper to ClusterProxy::executeQuery
1 parent 4480bcf commit b28b8ed

File tree

4 files changed

+44
-84
lines changed

4 files changed

+44
-84
lines changed

src/Interpreters/ClusterProxy/executeQuery.cpp

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <Parsers/ASTInsertQuery.h>
1616
#include <Planner/Utils.h>
1717
#include <Processors/QueryPlan/ParallelReplicasLocalPlan.h>
18+
#include <Processors/QueryPlan/DistributedCreateLocalPlan.h>
1819
#include <Processors/QueryPlan/QueryPlan.h>
1920
#include <Processors/QueryPlan/ReadFromLocalReplica.h>
2021
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
@@ -333,7 +334,8 @@ void executeQuery(
333334
const std::string & sharding_key_column_name,
334335
const DistributedSettings & distributed_settings,
335336
AdditionalShardFilterGenerator shard_filter_generator,
336-
bool is_remote_function)
337+
bool is_remote_function,
338+
std::span<const SelectQueryInfo> additional_query_infos)
337339
{
338340
const Settings & settings = context->getSettingsRef();
339341

@@ -361,6 +363,7 @@ void executeQuery(
361363
new_context->increaseDistributedDepth();
362364

363365
const size_t shards = cluster->getShardCount();
366+
const bool has_additional_query_infos = !additional_query_infos.empty();
364367

365368
if (context->getSettingsRef()[Setting::allow_experimental_analyzer])
366369
{
@@ -470,6 +473,29 @@ void executeQuery(
470473
plans.emplace_back(std::move(plan));
471474
}
472475

476+
if (has_additional_query_infos)
477+
{
478+
if (!header)
479+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Header is not initialized for local hybrid plan creation");
480+
481+
const Block & header_block = *header;
482+
for (const auto & additional_query_info : additional_query_infos)
483+
{
484+
auto additional_plan = createLocalPlan(
485+
additional_query_info.query,
486+
header_block,
487+
context,
488+
processed_stage,
489+
0, /// shard_num is not applicable for local hybrid plans
490+
1, /// shard_count is not applicable for local hybrid plans
491+
false,
492+
false,
493+
"");
494+
495+
plans.emplace_back(std::move(additional_plan));
496+
}
497+
}
498+
473499
if (plans.empty())
474500
return;
475501

@@ -485,6 +511,8 @@ void executeQuery(
485511
input_headers.emplace_back(plan->getCurrentHeader());
486512

487513
auto union_step = std::make_unique<UnionStep>(std::move(input_headers));
514+
if (has_additional_query_infos)
515+
union_step->setStepDescription("Hybrid");
488516
query_plan.unitePlans(std::move(union_step), std::move(plans));
489517
}
490518

src/Interpreters/ClusterProxy/executeQuery.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <Parsers/IAST_fwd.h>
66

77
#include <optional>
8+
#include <span>
89

910
namespace DB
1011
{
@@ -88,7 +89,8 @@ void executeQuery(
8889
const std::string & sharding_key_column_name,
8990
const DistributedSettings & distributed_settings,
9091
AdditionalShardFilterGenerator shard_filter_generator,
91-
bool is_remote_function);
92+
bool is_remote_function,
93+
std::span<const SelectQueryInfo> additional_query_infos = {});
9294

9395
std::optional<QueryPipeline> executeInsertSelectWithParallelReplicas(
9496
const ASTInsertQuery & query_ast,

src/Storages/StorageDistributed.cpp

Lines changed: 6 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
#include <Disks/IDisk.h>
66

77
#include <QueryPipeline/RemoteQueryExecutor.h>
8-
#include <Processors/QueryPlan/DistributedCreateLocalPlan.h>
98

109
#include <DataTypes/DataTypeFactory.h>
1110
#include <DataTypes/DataTypeLowCardinality.h>
@@ -99,7 +98,6 @@
9998
#include <Processors/QueryPlan/QueryPlan.h>
10099
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
101100
#include <Processors/QueryPlan/ExpressionStep.h>
102-
#include <Processors/QueryPlan/UnionStep.h>
103101
#include <Processors/QueryPlan/Optimizations/actionsDAGUtils.h>
104102
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
105103
#include <Processors/Sources/NullSource.h>
@@ -1075,21 +1073,10 @@ void StorageDistributed::read(
10751073

10761074
SelectQueryInfo modified_query_info = query_info;
10771075

1078-
std::vector<SelectQueryInfo> all_query_infos;
1076+
std::vector<SelectQueryInfo> additional_query_infos;
10791077

10801078
const auto & settings = local_context->getSettingsRef();
10811079

1082-
// Disable memory_bound settings when additional table functions are present
1083-
// to avoid UNKNOWN_AGGREGATED_DATA_VARIANT when mixing different aggregation variants
1084-
// from remote shards (with memory_bound) and local layers (without memory_bound)
1085-
// FIXME: we can push additional_query_info into ClusterProxy::executeQuery to avoid this hack
1086-
// TODO: test is needed
1087-
if (!additional_table_functions.empty())
1088-
{
1089-
const_cast<Context *>(local_context.get())->setSetting("enable_memory_bound_merging_of_aggregation_results", false);
1090-
const_cast<Context *>(local_context.get())->setSetting("distributed_aggregation_memory_efficient", false);
1091-
}
1092-
10931080
if (settings[Setting::allow_experimental_analyzer])
10941081
{
10951082
StorageID remote_storage_id = StorageID::createEmpty();
@@ -1130,7 +1117,7 @@ void StorageDistributed::read(
11301117
additional_query_info.query = queryNodeToDistributedSelectQuery(additional_query_tree);
11311118
additional_query_info.query_tree = std::move(additional_query_tree);
11321119

1133-
all_query_infos.push_back(additional_query_info);
1120+
additional_query_infos.push_back(std::move(additional_query_info));
11341121
}
11351122
}
11361123

@@ -1169,7 +1156,7 @@ void StorageDistributed::read(
11691156
table_function_entry.predicate_ast);
11701157
}
11711158

1172-
all_query_infos.push_back(additional_query_info);
1159+
additional_query_infos.push_back(std::move(additional_query_info));
11731160
}
11741161
}
11751162

@@ -1187,7 +1174,7 @@ void StorageDistributed::read(
11871174

11881175
const auto & snapshot_data = assert_cast<const SnapshotData &>(*storage_snapshot->data);
11891176

1190-
if (!modified_query_info.getCluster()->getShardsInfo().empty())
1177+
if (!modified_query_info.getCluster()->getShardsInfo().empty() || !additional_query_infos.empty())
11911178
{
11921179
ClusterProxy::SelectStreamFactory select_stream_factory =
11931180
ClusterProxy::SelectStreamFactory(
@@ -1213,76 +1200,13 @@ void StorageDistributed::read(
12131200
sharding_key_column_name,
12141201
*distributed_settings,
12151202
shard_filter_generator,
1216-
is_remote_function);
1203+
is_remote_function,
1204+
additional_query_infos);
12171205

12181206
/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
12191207
if (!query_plan.isInitialized())
12201208
throw Exception(ErrorCodes::LOGICAL_ERROR, "Pipeline is not initialized");
12211209
}
1222-
1223-
std::vector<QueryPlan> additional_plans;
1224-
const Block * header_raw = header.get();
1225-
if (!header_raw)
1226-
throw Exception(ErrorCodes::LOGICAL_ERROR, "Header is not initialized for local plan creation");
1227-
1228-
const Block & header_block = *header_raw;
1229-
1230-
for (size_t i = 0; i < all_query_infos.size(); ++i)
1231-
{
1232-
auto additional_query_info = all_query_infos[i];
1233-
1234-
// This properly handles both analyzer and legacy modes with converting actions
1235-
auto additional_plan_ptr = createLocalPlan(
1236-
additional_query_info.query,
1237-
header_block,
1238-
local_context,
1239-
processed_stage,
1240-
0, // shard_num - not applicable for local plans
1241-
1, // shard_count - not applicable for local plans
1242-
false, // has_missing_objects
1243-
false, // build_logical_plan
1244-
""); // default_database
1245-
1246-
additional_plans.push_back(std::move(*additional_plan_ptr));
1247-
}
1248-
1249-
// Combine all plans using UnionStep
1250-
if (!additional_plans.empty())
1251-
{
1252-
// Convert QueryPlan objects to QueryPlanPtr
1253-
std::vector<QueryPlanPtr> plan_ptrs;
1254-
plan_ptrs.reserve(additional_plans.size() + (query_plan.isInitialized() ? 1 : 0));
1255-
1256-
// Add the main plan to the list
1257-
if (query_plan.isInitialized())
1258-
plan_ptrs.push_back(std::make_unique<QueryPlan>(std::move(query_plan)));
1259-
1260-
// Add additional plans
1261-
for (auto & plan : additional_plans)
1262-
{
1263-
plan_ptrs.push_back(std::make_unique<QueryPlan>(std::move(plan)));
1264-
}
1265-
1266-
// Create a new query plan that unions all the results
1267-
QueryPlan union_plan;
1268-
1269-
// Get headers from all plans
1270-
SharedHeaders headers;
1271-
headers.reserve(plan_ptrs.size());
1272-
for (const auto & plan_ptr : plan_ptrs)
1273-
{
1274-
headers.emplace_back(plan_ptr->getCurrentHeader());
1275-
}
1276-
1277-
// Create UnionStep to combine all plans
1278-
auto union_step = std::make_unique<UnionStep>(std::move(headers), 0);
1279-
union_step->setStepDescription("Hybrid");
1280-
1281-
union_plan.unitePlans(std::move(union_step), std::move(plan_ptrs));
1282-
1283-
// Replace the original query plan with the union plan
1284-
query_plan = std::move(union_plan);
1285-
}
12861210
}
12871211

12881212

tests/queries/0_stateless/03642_tiered_distributed.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,3 +337,9 @@ DROP TABLE IF EXISTS test_tiered_watermark_before SYNC;
337337
-- Condition with dictGet('a1_watermarks_dict', ...)
338338

339339
-- access rights check
340+
341+
342+
-- TODO:
343+
-- test for distributed_aggregation_memory_efficient & enable_memory_bound_merging_of_aggregation_results
344+
-- to avoid UNKNOWN_AGGREGATED_DATA_VARIANT when mixing different aggregation variants
345+
-- from remote shards (with memory_bound) and local layers (without memory_bound)

0 commit comments

Comments
 (0)