Skip to content

Commit b1978ae

Browse files
authored
Merge pull request #1201 from Altinity/bugfix/antalya-25.8/segfault_on_unexpected_shutdown
Fix segfault on unexpected node shutdown
2 parents 66a6bf5 + be61867 commit b1978ae

File tree

5 files changed

+34
-57
lines changed

5 files changed

+34
-57
lines changed

src/Core/Settings.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7103,9 +7103,6 @@ Use Shuffle aggregation strategy instead of PartialAggregation + Merge in distri
71037103
)", EXPERIMENTAL) \
71047104
DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"(
71057105
Allow Iceberg read optimization based on Iceberg metadata.
7106-
)", EXPERIMENTAL) \
7107-
DECLARE(Bool, allow_retries_in_cluster_requests, false, R"(
7108-
Allow retries in cluster request, when one node goes offline
71097106
)", EXPERIMENTAL) \
71107107
DECLARE(Bool, object_storage_remote_initiator, false, R"(
71117108
Execute request to object storage as remote on one of object_storage_cluster nodes.
@@ -7228,6 +7225,7 @@ Sets the evaluation time to be used with promql dialect. 'auto' means the curren
72287225
MAKE_OBSOLETE(M, Bool, allow_experimental_shared_set_join, true) \
72297226
MAKE_OBSOLETE(M, UInt64, min_external_sort_block_bytes, 100_MiB) \
72307227
MAKE_OBSOLETE(M, UInt64, distributed_cache_read_alignment, 0) \
7228+
MAKE_OBSOLETE(M, Bool, allow_retries_in_cluster_requests, false) \
72317229
/** The section above is for obsolete settings. Do not add anything there. */
72327230
#endif /// __CLION_IDE__
72337231

src/QueryPipeline/RemoteQueryExecutor.cpp

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ namespace Setting
5252
extern const SettingsBool use_hedged_requests;
5353
extern const SettingsBool push_external_roles_in_interserver_queries;
5454
extern const SettingsMilliseconds parallel_replicas_connect_timeout_ms;
55-
extern const SettingsBool allow_retries_in_cluster_requests;
5655
}
5756

5857
namespace ErrorCodes
@@ -83,7 +82,6 @@ RemoteQueryExecutor::RemoteQueryExecutor(
8382
, extension(extension_)
8483
, priority_func(priority_func_)
8584
, read_packet_type_separately(context->canUseParallelReplicasOnInitiator() && !context->getSettingsRef()[Setting::use_hedged_requests])
86-
, allow_retries_in_cluster_requests(context->getSettingsRef()[Setting::allow_retries_in_cluster_requests])
8785
{
8886
if (stage == QueryProcessingStage::QueryPlan && !query_plan)
8987
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query plan is not passed for QueryPlan processing stage");
@@ -469,8 +467,7 @@ int RemoteQueryExecutor::sendQueryAsync()
469467
read_context = std::make_unique<ReadContext>(
470468
*this,
471469
/*suspend_when_query_sent*/ true,
472-
read_packet_type_separately,
473-
allow_retries_in_cluster_requests);
470+
read_packet_type_separately);
474471

475472
/// If query already sent, do nothing. Note that we cannot use sent_query flag here,
476473
/// because we can still be in process of sending scalars or external tables.
@@ -543,8 +540,7 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
543540
read_context = std::make_unique<ReadContext>(
544541
*this,
545542
/*suspend_when_query_sent*/ false,
546-
read_packet_type_separately,
547-
allow_retries_in_cluster_requests);
543+
read_packet_type_separately);
548544
recreate_read_context = false;
549545
}
550546

@@ -735,16 +731,13 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
735731
break;
736732

737733
case Protocol::Server::ConnectionLost:
738-
if (allow_retries_in_cluster_requests)
734+
if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info)
739735
{
740-
if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info)
736+
if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica))
741737
{
742-
if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica))
743-
{
744-
finished = true;
745-
extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica);
746-
return ReadResult(Block{});
747-
}
738+
finished = true;
739+
extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica);
740+
return ReadResult(Block{});
748741
}
749742
}
750743
packet.exception->rethrow();

src/QueryPipeline/RemoteQueryExecutor.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,8 +339,6 @@ class RemoteQueryExecutor
339339

340340
const bool read_packet_type_separately = false;
341341

342-
const bool allow_retries_in_cluster_requests = false;
343-
344342
std::unordered_set<size_t> replica_has_processed_data;
345343

346344
/// Send all scalars to remote servers

src/QueryPipeline/RemoteQueryExecutorReadContext.cpp

Lines changed: 25 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,11 @@ namespace ErrorCodes
2222
RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(
2323
RemoteQueryExecutor & executor_,
2424
bool suspend_when_query_sent_,
25-
bool read_packet_type_separately_,
26-
bool allow_retries_in_cluster_requests_)
25+
bool read_packet_type_separately_)
2726
: AsyncTaskExecutor(std::make_unique<Task>(*this))
2827
, executor(executor_)
2928
, suspend_when_query_sent(suspend_when_query_sent_)
3029
, read_packet_type_separately(read_packet_type_separately_)
31-
, allow_retries_in_cluster_requests(allow_retries_in_cluster_requests_)
3230
{
3331
if (-1 == pipe2(pipe_fd, O_NONBLOCK))
3432
throw ErrnoException(ErrorCodes::CANNOT_OPEN_FILE, "Cannot create pipe");
@@ -63,46 +61,38 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus
6361
{
6462
while (true)
6563
{
66-
try
67-
{
68-
read_context.has_read_packet_part = PacketPart::None;
69-
70-
if (read_context.read_packet_type_separately)
71-
{
72-
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
73-
read_context.has_read_packet_part = PacketPart::Type;
74-
suspend_callback();
75-
}
76-
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
77-
read_context.has_read_packet_part = PacketPart::Body;
78-
if (read_context.packet.type == Protocol::Server::Data)
79-
read_context.has_data_packets = true;
80-
}
81-
catch (const Exception & e)
64+
read_context.has_read_packet_part = PacketPart::None;
65+
66+
if (read_context.read_packet_type_separately)
8267
{
83-
/// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes.
84-
/// If initiator did not process any data packets before, this fact can be ignored.
85-
/// Unprocessed tasks will be executed on other nodes.
86-
if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
87-
&& !read_context.has_data_packets.load() && read_context.executor.skipUnavailableShards())
88-
{
89-
read_context.has_read_packet_part = PacketPart::None;
90-
}
91-
else
92-
throw;
68+
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
69+
read_context.has_read_packet_part = PacketPart::Type;
70+
suspend_callback();
9371
}
72+
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
73+
read_context.has_read_packet_part = PacketPart::Body;
74+
if (read_context.packet.type == Protocol::Server::Data)
75+
read_context.has_data_packets = true;
9476

9577
suspend_callback();
9678
}
9779
}
98-
catch (const Exception &)
80+
catch (const Exception & e)
9981
{
100-
if (!read_context.allow_retries_in_cluster_requests)
82+
/// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes.
83+
/// If initiator did not process any data packets before, this fact can be ignored.
84+
/// Unprocessed tasks will be executed on other nodes.
85+
if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
86+
&& !read_context.has_data_packets.load()
87+
&& read_context.executor.skipUnavailableShards())
88+
{
89+
read_context.packet.type = Protocol::Server::ConnectionLost;
90+
read_context.packet.exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode());
91+
read_context.has_read_packet_part = PacketPart::Body;
92+
suspend_callback();
93+
}
94+
else
10195
throw;
102-
read_context.packet.type = Protocol::Server::ConnectionLost;
103-
read_context.packet.exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode());
104-
read_context.has_read_packet_part = PacketPart::Body;
105-
suspend_callback();
10696
}
10797
}
10898

src/QueryPipeline/RemoteQueryExecutorReadContext.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
2828
explicit RemoteQueryExecutorReadContext(
2929
RemoteQueryExecutor & executor_,
3030
bool suspend_when_query_sent_,
31-
bool read_packet_type_separately_,
32-
bool allow_retries_in_cluster_requests_);
31+
bool read_packet_type_separately_);
3332

3433
~RemoteQueryExecutorReadContext() override;
3534

@@ -112,7 +111,6 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
112111
bool suspend_when_query_sent = false;
113112
bool is_query_sent = false;
114113
const bool read_packet_type_separately = false;
115-
const bool allow_retries_in_cluster_requests = false;
116114
};
117115

118116
}

0 commit comments

Comments
 (0)