Skip to content

Commit 628904a

Browse files
committed
Remove useless allow_retries_in_cluster_requests setting
1 parent 591b199 commit 628904a

File tree

5 files changed

+15
-74
lines changed

5 files changed

+15
-74
lines changed

src/Core/Settings.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7103,9 +7103,6 @@ Use Shuffle aggregation strategy instead of PartialAggregation + Merge in distri
71037103
)", EXPERIMENTAL) \
71047104
DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"(
71057105
Allow Iceberg read optimization based on Iceberg metadata.
7106-
)", EXPERIMENTAL) \
7107-
DECLARE(Bool, allow_retries_in_cluster_requests, false, R"(
7108-
Allow retries in cluster request, when one node goes offline
71097106
)", EXPERIMENTAL) \
71107107
DECLARE(Bool, object_storage_remote_initiator, false, R"(
71117108
Execute request to object storage as remote on one of object_storage_cluster nodes.
@@ -7227,6 +7224,7 @@ Sets the evaluation time to be used with promql dialect. 'auto' means the curren
72277224
MAKE_OBSOLETE(M, Bool, allow_experimental_shared_set_join, true) \
72287225
MAKE_OBSOLETE(M, UInt64, min_external_sort_block_bytes, 100_MiB) \
72297226
MAKE_OBSOLETE(M, UInt64, distributed_cache_read_alignment, 0) \
7227+
MAKE_OBSOLETE(M, Bool, allow_retries_in_cluster_requests, false) \
72307228
/** The section above is for obsolete settings. Do not add anything there. */
72317229
#endif /// __CLION_IDE__
72327230

src/QueryPipeline/RemoteQueryExecutor.cpp

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ namespace Setting
5252
extern const SettingsBool use_hedged_requests;
5353
extern const SettingsBool push_external_roles_in_interserver_queries;
5454
extern const SettingsMilliseconds parallel_replicas_connect_timeout_ms;
55-
extern const SettingsBool allow_retries_in_cluster_requests;
5655
}
5756

5857
namespace ErrorCodes
@@ -83,7 +82,6 @@ RemoteQueryExecutor::RemoteQueryExecutor(
8382
, extension(extension_)
8483
, priority_func(priority_func_)
8584
, read_packet_type_separately(context->canUseParallelReplicasOnInitiator() && !context->getSettingsRef()[Setting::use_hedged_requests])
86-
, allow_retries_in_cluster_requests(context->getSettingsRef()[Setting::allow_retries_in_cluster_requests])
8785
{
8886
if (stage == QueryProcessingStage::QueryPlan && !query_plan)
8987
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query plan is not passed for QueryPlan processing stage");
@@ -468,8 +466,7 @@ int RemoteQueryExecutor::sendQueryAsync()
468466
read_context = std::make_unique<ReadContext>(
469467
*this,
470468
/*suspend_when_query_sent*/ true,
471-
read_packet_type_separately,
472-
allow_retries_in_cluster_requests);
469+
read_packet_type_separately);
473470

474471
/// If query already sent, do nothing. Note that we cannot use sent_query flag here,
475472
/// because we can still be in process of sending scalars or external tables.
@@ -542,8 +539,7 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
542539
read_context = std::make_unique<ReadContext>(
543540
*this,
544541
/*suspend_when_query_sent*/ false,
545-
read_packet_type_separately,
546-
allow_retries_in_cluster_requests);
542+
read_packet_type_separately);
547543
recreate_read_context = false;
548544
}
549545

@@ -734,18 +730,6 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
734730
break;
735731

736732
case Protocol::Server::ConnectionLost:
737-
if (allow_retries_in_cluster_requests)
738-
{
739-
if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info)
740-
{
741-
if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica))
742-
{
743-
finished = true;
744-
extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica);
745-
return ReadResult(Block{});
746-
}
747-
}
748-
}
749733
packet.exception->rethrow();
750734
break;
751735

@@ -1016,11 +1000,6 @@ void RemoteQueryExecutor::setProfileInfoCallback(ProfileInfoCallback callback)
10161000
profile_info_callback = std::move(callback);
10171001
}
10181002

1019-
bool RemoteQueryExecutor::skipUnavailableShards() const
1020-
{
1021-
return context->getSettingsRef()[Setting::skip_unavailable_shards];
1022-
}
1023-
10241003
bool RemoteQueryExecutor::needToSkipUnavailableShard() const
10251004
{
10261005
return context->getSettingsRef()[Setting::skip_unavailable_shards] && (0 == connections->size());

src/QueryPipeline/RemoteQueryExecutor.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,6 @@ class RemoteQueryExecutor
233233

234234
IConnections & getConnections() { return *connections; }
235235

236-
bool skipUnavailableShards() const;
237-
238236
bool needToSkipUnavailableShard() const;
239237

240238
bool isReplicaUnavailable() const { return extension && extension->parallel_reading_coordinator && connections->size() == 0; }
@@ -339,8 +337,6 @@ class RemoteQueryExecutor
339337

340338
const bool read_packet_type_separately = false;
341339

342-
const bool allow_retries_in_cluster_requests = false;
343-
344340
std::unordered_set<size_t> replica_has_processed_data;
345341

346342
/// Send all scalars to remote servers

src/QueryPipeline/RemoteQueryExecutorReadContext.cpp

Lines changed: 11 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,11 @@ namespace ErrorCodes
2222
RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(
2323
RemoteQueryExecutor & executor_,
2424
bool suspend_when_query_sent_,
25-
bool read_packet_type_separately_,
26-
bool allow_retries_in_cluster_requests_)
25+
bool read_packet_type_separately_)
2726
: AsyncTaskExecutor(std::make_unique<Task>(*this))
2827
, executor(executor_)
2928
, suspend_when_query_sent(suspend_when_query_sent_)
3029
, read_packet_type_separately(read_packet_type_separately_)
31-
, allow_retries_in_cluster_requests(allow_retries_in_cluster_requests_)
3230
{
3331
if (-1 == pipe2(pipe_fd, O_NONBLOCK))
3432
throw ErrnoException(ErrorCodes::CANNOT_OPEN_FILE, "Cannot create pipe");
@@ -59,49 +57,21 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus
5957
if (read_context.executor.needToSkipUnavailableShard())
6058
return;
6159

62-
try
60+
while (true)
6361
{
64-
while (true)
65-
{
66-
try
67-
{
68-
read_context.has_read_packet_part = PacketPart::None;
69-
70-
if (read_context.read_packet_type_separately)
71-
{
72-
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
73-
read_context.has_read_packet_part = PacketPart::Type;
74-
suspend_callback();
75-
}
76-
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
77-
read_context.has_read_packet_part = PacketPart::Body;
78-
if (read_context.packet.type == Protocol::Server::Data)
79-
read_context.has_data_packets = true;
80-
}
81-
catch (const Exception & e)
82-
{
83-
/// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes.
84-
/// If initiator did not process any data packets before, this fact can be ignored.
85-
/// Unprocessed tasks will be executed on other nodes.
86-
if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
87-
&& !read_context.has_data_packets.load() && read_context.executor.skipUnavailableShards())
88-
{
89-
read_context.has_read_packet_part = PacketPart::None;
90-
}
91-
else
92-
throw;
93-
}
62+
read_context.has_read_packet_part = PacketPart::None;
9463

64+
if (read_context.read_packet_type_separately)
65+
{
66+
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
67+
read_context.has_read_packet_part = PacketPart::Type;
9568
suspend_callback();
9669
}
97-
}
98-
catch (const Exception &)
99-
{
100-
if (!read_context.allow_retries_in_cluster_requests)
101-
throw;
102-
read_context.packet.type = Protocol::Server::ConnectionLost;
103-
read_context.packet.exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode());
70+
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
10471
read_context.has_read_packet_part = PacketPart::Body;
72+
if (read_context.packet.type == Protocol::Server::Data)
73+
read_context.has_data_packets = true;
74+
10575
suspend_callback();
10676
}
10777
}

src/QueryPipeline/RemoteQueryExecutorReadContext.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
2828
explicit RemoteQueryExecutorReadContext(
2929
RemoteQueryExecutor & executor_,
3030
bool suspend_when_query_sent_,
31-
bool read_packet_type_separately_,
32-
bool allow_retries_in_cluster_requests_);
31+
bool read_packet_type_separately_);
3332

3433
~RemoteQueryExecutorReadContext() override;
3534

@@ -112,7 +111,6 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
112111
bool suspend_when_query_sent = false;
113112
bool is_query_sent = false;
114113
const bool read_packet_type_separately = false;
115-
const bool allow_retries_in_cluster_requests = false;
116114
};
117115

118116
}

0 commit comments

Comments
 (0)