Skip to content

Commit 5028524

Browse files
committed
Fix read from remote without hedged requests
In case of disabled parallel reading from replicas, hedged connections will read from single replica, but when hedged requests are disabled, the query will use `max_parallel_replicas`. This patch enforces client to use single replica in case of disabled parallel replicas reading with `PoolMode::GET_ONE`, and removes this logic from `HedgedConnections`.
1 parent 5ce1364 commit 5028524

File tree

4 files changed

+35
-5
lines changed

4 files changed

+35
-5
lines changed

src/Client/HedgedConnections.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ namespace Setting
1818
{
1919
extern const SettingsBool allow_changing_replica_until_first_data_packet;
2020
extern const SettingsBool allow_experimental_analyzer;
21-
extern const SettingsUInt64 allow_experimental_parallel_reading_from_replicas;
2221
extern const SettingsUInt64 connections_with_failover_max_tries;
2322
extern const SettingsDialect dialect;
2423
extern const SettingsBool fallback_to_stale_replicas_for_distributed_queries;
@@ -54,9 +53,7 @@ HedgedConnections::HedgedConnections(
5453
timeouts_,
5554
context_->getSettingsRef()[Setting::connections_with_failover_max_tries].value,
5655
context_->getSettingsRef()[Setting::fallback_to_stale_replicas_for_distributed_queries].value,
57-
context_->getSettingsRef()[Setting::allow_experimental_parallel_reading_from_replicas].value > 0
58-
? context_->getSettingsRef()[Setting::max_parallel_replicas].value
59-
: 1,
56+
context_->getSettingsRef()[Setting::max_parallel_replicas].value,
6057
context_->getSettingsRef()[Setting::skip_unavailable_shards].value,
6158
table_to_check_,
6259
priority_func)

src/Processors/QueryPlan/ReadFromRemote.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ namespace DB
5050
{
5151
namespace Setting
5252
{
53+
extern const SettingsUInt64 allow_experimental_parallel_reading_from_replicas;
5354
extern const SettingsBool async_query_sending_for_remote;
5455
extern const SettingsBool async_socket_for_remote;
5556
extern const SettingsString cluster_for_parallel_replicas;
@@ -600,6 +601,7 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
600601
bool add_extremes = false;
601602
bool async_read = context->getSettingsRef()[Setting::async_socket_for_remote];
602603
bool async_query_sending = context->getSettingsRef()[Setting::async_query_sending_for_remote];
604+
bool parallel_replicas_disabled = context->getSettingsRef()[Setting::allow_experimental_parallel_reading_from_replicas] == 0;
603605
if (stage == QueryProcessingStage::Complete)
604606
{
605607
if (const auto * ast_select = shard.query->as<ASTSelectQuery>())
@@ -690,14 +692,16 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::SelectStreamFact
690692
shard.shard_info.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage_to_use, shard.query_plan);
691693
remote_query_executor->setLogger(log);
692694

693-
if (context->canUseTaskBasedParallelReplicas())
695+
if (context->canUseTaskBasedParallelReplicas() || parallel_replicas_disabled)
694696
{
695697
// when doing parallel reading from replicas (ParallelReplicasMode::READ_TASKS) on a shard:
696698
// establish a connection to a replica on the shard, the replica will instantiate coordinator to manage parallel reading from replicas on the shard.
697699
// The coordinator will return query result from the shard.
698700
// Only one coordinator per shard is necessary. Therefore using PoolMode::GET_ONE to establish only one connection per shard.
699701
// Using PoolMode::GET_MANY for this mode will(can) lead to instantiation of several coordinators (depends on max_parallel_replicas setting)
700702
// each will execute parallel reading from replicas, so the query result will be multiplied by the number of created coordinators
703+
//
704+
// In case parallel replicas are disabled, there also should be a single connection to each shard to prevent result duplication
701705
remote_query_executor->setPoolMode(PoolMode::GET_ONE);
702706
}
703707
else
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
remote() 1 shard: 100
2+
remote() 3 shards: 300
3+
Distributed: 100
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
drop table if exists 03444_local;
2+
drop table if exists 03444_distr;
3+
4+
create table 03444_local (id UInt32) engine = MergeTree order by id as select * from numbers(100);
5+
create table 03444_distr (id UInt32) engine = Distributed(test_cluster_one_shard_two_replicas, currentDatabase(), 03444_local);
6+
7+
select 'remote() 1 shard: ' || count()
8+
from remote('127.0.0.1|127.0.0.2|127.0.0.3', currentDatabase(), 03444_local)
9+
settings prefer_localhost_replica = 0,
10+
use_hedged_requests = 0,
11+
max_parallel_replicas = 100;
12+
13+
select 'remote() 3 shards: ' || count()
14+
from remote('127.0.0.1|127.0.0.2,127.0.0.3|127.0.0.4,127.0.0.5|127.0.0.6', currentDatabase(), 03444_local)
15+
settings prefer_localhost_replica = 0,
16+
use_hedged_requests = 0,
17+
max_parallel_replicas = 100;
18+
19+
select 'Distributed: ' || count()
20+
from 03444_distr
21+
settings prefer_localhost_replica = 0,
22+
use_hedged_requests = 0,
23+
max_parallel_replicas = 100;
24+
25+
drop table 03444_local;
26+
drop table 03444_distr;

0 commit comments

Comments
 (0)