Skip to content

Commit ae43913

Browse files
authored
Merge pull request ClickHouse#79599 from korowa/fix-remote-executor-pool-mode
Fix read from remote without hedged requests
2 parents 477887a + 5028524 commit ae43913

File tree

4 files changed

+35
-5
lines changed

4 files changed

+35
-5
lines changed

src/Client/HedgedConnections.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ namespace Setting
1818
{
1919
extern const SettingsBool allow_changing_replica_until_first_data_packet;
2020
extern const SettingsBool allow_experimental_analyzer;
21-
extern const SettingsUInt64 allow_experimental_parallel_reading_from_replicas;
2221
extern const SettingsUInt64 connections_with_failover_max_tries;
2322
extern const SettingsDialect dialect;
2423
extern const SettingsBool fallback_to_stale_replicas_for_distributed_queries;
@@ -54,9 +53,7 @@ HedgedConnections::HedgedConnections(
5453
timeouts_,
5554
context_->getSettingsRef()[Setting::connections_with_failover_max_tries].value,
5655
context_->getSettingsRef()[Setting::fallback_to_stale_replicas_for_distributed_queries].value,
57-
context_->getSettingsRef()[Setting::allow_experimental_parallel_reading_from_replicas].value > 0
58-
? context_->getSettingsRef()[Setting::max_parallel_replicas].value
59-
: 1,
56+
context_->getSettingsRef()[Setting::max_parallel_replicas].value,
6057
context_->getSettingsRef()[Setting::skip_unavailable_shards].value,
6158
table_to_check_,
6259
priority_func)

src/Processors/QueryPlan/ReadFromRemote.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ namespace DB
5050
{
5151
namespace Setting
5252
{
53+
extern const SettingsUInt64 allow_experimental_parallel_reading_from_replicas;
5354
extern const SettingsBool async_query_sending_for_remote;
5455
extern const SettingsBool async_socket_for_remote;
5556
extern const SettingsString cluster_for_parallel_replicas;
@@ -600,6 +601,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
600601
bool add_extremes = false;
601602
bool async_read = context->getSettingsRef()[Setting::async_socket_for_remote];
602603
bool async_query_sending = context->getSettingsRef()[Setting::async_query_sending_for_remote];
604+
bool parallel_replicas_disabled = context->getSettingsRef()[Setting::allow_experimental_parallel_reading_from_replicas] == 0;
603605
if (stage == QueryProcessingStage::Complete)
604606
{
605607
if (const auto * ast_select = shard.query->as<ASTSelectQuery>())
@@ -690,14 +692,16 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
690692
shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage_to_use, shard.query_plan);
691693
remote_query_executor->setLogger(log);
692694

693-
if (context->canUseTaskBasedParallelReplicas())
695+
if (context->canUseTaskBasedParallelReplicas() || parallel_replicas_disabled)
694696
{
695697
// when doing parallel reading from replicas (ParallelReplicasMode::READ_TASKS) on a shard:
696698
// establish a connection to a replica on the shard, the replica will instantiate coordinator to manage parallel reading from replicas on the shard.
697699
// The coordinator will return query result from the shard.
698700
// Only one coordinator per shard is necessary. Therefore using PoolMode::GET_ONE to establish only one connection per shard.
699701
// Using PoolMode::GET_MANY for this mode will(can) lead to instantiation of several coordinators (depends on max_parallel_replicas setting)
700702
// each will execute parallel reading from replicas, so the query result will be multiplied by the number of created coordinators
703+
//
704+
// In case parallel replicas are disabled, there also should be a single connection to each shard to prevent result duplication
701705
remote_query_executor->setPoolMode(PoolMode::GET_ONE);
702706
}
703707
else
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
remote() 1 shard: 100
2+
remote() 3 shards: 300
3+
Distributed: 100
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
drop table if exists 03444_local;
2+
drop table if exists 03444_distr;
3+
4+
create table 03444_local (id UInt32) engine = MergeTree order by id as select * from numbers(100);
5+
create table 03444_distr (id UInt32) engine = Distributed(test_cluster_one_shard_two_replicas, currentDatabase(), 03444_local);
6+
7+
select 'remote() 1 shard: ' || count()
8+
from remote('127.0.0.1|127.0.0.2|127.0.0.3', currentDatabase(), 03444_local)
9+
settings prefer_localhost_replica = 0,
10+
use_hedged_requests = 0,
11+
max_parallel_replicas = 100;
12+
13+
select 'remote() 3 shards: ' || count()
14+
from remote('127.0.0.1|127.0.0.2,127.0.0.3|127.0.0.4,127.0.0.5|127.0.0.6', currentDatabase(), 03444_local)
15+
settings prefer_localhost_replica = 0,
16+
use_hedged_requests = 0,
17+
max_parallel_replicas = 100;
18+
19+
select 'Distributed: ' || count()
20+
from 03444_distr
21+
settings prefer_localhost_replica = 0,
22+
use_hedged_requests = 0,
23+
max_parallel_replicas = 100;
24+
25+
drop table 03444_local;
26+
drop table 03444_distr;

0 commit comments

Comments
 (0)